HADOOP-15176. Enhance IAM Assumed Role support in S3A client.

Contributed by Steve Loughran
This commit is contained in:
Steve Loughran 2018-02-15 15:56:10 +00:00
parent c761e658f6
commit 96c047fbb9
23 changed files with 2154 additions and 463 deletions

View File

@ -89,6 +89,14 @@ public class JsonSerialization<T> {
return classType.getSimpleName();
}
/**
* Get the mapper of this class.
* @return the mapper
*/
public ObjectMapper getMapper() {
return mapper;
}
/**
* Convert from JSON.
*

View File

@ -977,20 +977,21 @@
</property>
<property>
<name>fs.s3a.assumed.role.session.duration</name>
<value>30m</value>
<name>fs.s3a.assumed.role.policy</name>
<value/>
<description>
Duration of assumed roles before a refresh is attempted.
JSON policy to apply to the role.
Only used if AssumedRoleCredentialProvider is the AWS credential provider.
</description>
</property>
<property>
<name>fs.s3a.assumed.role.policy</name>
<value/>
<name>fs.s3a.assumed.role.session.duration</name>
<value>30m</value>
<description>
JSON policy containing more restrictions to apply to the role.
Duration of assumed roles before a refresh is attempted.
Only used if AssumedRoleCredentialProvider is the AWS credential provider.
Range: 15m to 1h
</description>
</property>

View File

@ -604,8 +604,44 @@ public final class LambdaTestUtils {
public static <T> void assertOptionalUnset(String message,
Optional<T> actual) {
Assert.assertNotNull(message, actual);
if (actual.isPresent()) {
Assert.fail("Expected empty option, got " + actual.get().toString());
actual.ifPresent(
t -> Assert.fail("Expected empty option, got " + t.toString()));
}
/**
* Invoke a callable; wrap all checked exceptions with an
* AssertionError.
* @param closure closure to execute
* @param <T> return type of closure
* @return the value of the closure
* @throws AssertionError if the operation raised an IOE or
* other checked exception.
*/
public static <T> T eval(Callable<T> closure) {
try {
return closure.call();
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new AssertionError(e.toString(), e);
}
}
/**
* Invoke a callable; wrap all checked exceptions with an
* AssertionError.
* @param closure closure to execute
* @return the value of the closure
* @throws AssertionError if the operation raised an IOE or
* other checked exception.
*/
public static void eval(VoidCallable closure) {
try {
closure.call();
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new AssertionError(e.toString(), e);
}
}

View File

@ -493,4 +493,40 @@ public class TestLambdaTestUtils extends Assert {
assertMinRetryCount(0);
}
@Test
public void testEvalToSuccess() {
assertTrue("Eval to success", eval(() -> true));
}
/**
* There's no attempt to wrap an unchecked exception
* with an AssertionError.
*/
@Test
public void testEvalDoesntWrapRTEs() throws Throwable {
intercept(RuntimeException.class, "",
() -> eval(() -> {
throw new RuntimeException("t");
}));
}
/**
* Verify that IOEs are caught and wrapped, and that the
* inner cause is the original IOE.
*/
@Test
public void testEvalDoesWrapIOEs() throws Throwable {
AssertionError ex = intercept(AssertionError.class, "ioe",
() -> eval(() -> {
throw new IOException("ioe");
}));
Throwable cause = ex.getCause();
if (cause == null) {
throw ex;
}
if (!(cause instanceof IOException)) {
throw cause;
}
}
}

View File

@ -94,7 +94,7 @@ public final class Constants {
public static final String ASSUMED_ROLE_CREDENTIALS_PROVIDER =
"fs.s3a.assumed.role.credentials.provider";
/** JSON policy containing more restrictions to apply to the role. */
/** JSON policy containing the policy to apply to the role. */
public static final String ASSUMED_ROLE_POLICY =
"fs.s3a.assumed.role.policy";

View File

@ -1399,9 +1399,9 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
} catch (MultiObjectDeleteException e) {
// one or more of the operations failed.
List<MultiObjectDeleteException.DeleteError> errors = e.getErrors();
LOG.error("Partial failure of delete, {} errors", errors.size(), e);
LOG.debug("Partial failure of delete, {} errors", errors.size(), e);
for (MultiObjectDeleteException.DeleteError error : errors) {
LOG.error("{}: \"{}\" - {}",
LOG.debug("{}: \"{}\" - {}",
error.getKey(), error.getCode(), error.getMessage());
}
throw e;
@ -1649,7 +1649,9 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
blockRootDelete(keyVersion.getKey());
}
if (enableMultiObjectsDelete) {
deleteObjects(new DeleteObjectsRequest(bucket).withKeys(keysToDelete));
deleteObjects(new DeleteObjectsRequest(bucket)
.withKeys(keysToDelete)
.withQuiet(true));
} else {
for (DeleteObjectsRequest.KeyVersion keyVersion : keysToDelete) {
deleteObject(keyVersion.getKey());
@ -1684,7 +1686,13 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
entryPoint(INVOCATION_DELETE);
boolean outcome = innerDelete(innerGetFileStatus(f, true), recursive);
if (outcome) {
maybeCreateFakeParentDirectory(f);
try {
maybeCreateFakeParentDirectory(f);
} catch (AccessDeniedException e) {
LOG.warn("Cannot create directory marker at {}: {}",
f.getParent(), e.toString());
LOG.debug("Failed to create fake dir above {}", f, e);
}
}
return outcome;
} catch (FileNotFoundException e) {
@ -1827,6 +1835,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
* @throws IOException IO problem
* @throws AmazonClientException untranslated AWS client problem
*/
@Retries.RetryTranslated
void maybeCreateFakeParentDirectory(Path path)
throws IOException, AmazonClientException {
Path parent = path.getParent();

View File

@ -30,6 +30,7 @@ import com.amazonaws.services.dynamodbv2.model.LimitExceededException;
import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException;
import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.MultiObjectDeleteException;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.google.common.base.Preconditions;
@ -248,6 +249,14 @@ public final class S3AUtils {
ioe = new AWSStatus500Exception(message, ase);
break;
case 200:
if (exception instanceof MultiObjectDeleteException) {
// failure during a bulk delete
return translateMultiObjectDeleteException(message,
(MultiObjectDeleteException) exception);
}
// other 200: FALL THROUGH
default:
// no specific exit code. Choose an IOE subclass based on the class
// of the caught exception
@ -378,6 +387,40 @@ public final class S3AUtils {
return new AWSServiceIOException(message, ex);
}
/**
* A MultiObjectDeleteException is raised if one or more delete objects
* listed in a bulk DELETE operation failed.
* The top-level exception is therefore just "something wasn't deleted",
* but doesn't include the what or the why.
* This translation will extract an AccessDeniedException if that's one of
* the causes, otherwise grabs the status code and uses it in the
* returned exception.
* @param message text for the exception
* @param ex exception to translate
* @return an IOE with more detail.
*/
public static IOException translateMultiObjectDeleteException(String message,
MultiObjectDeleteException ex) {
List<String> keys;
StringBuffer result = new StringBuffer(ex.getErrors().size() * 100);
result.append(message).append(": ");
String exitCode = "";
for (MultiObjectDeleteException.DeleteError error : ex.getErrors()) {
String code = error.getCode();
result.append(String.format("%s: %s: %s%n", code, error.getKey(),
error.getMessage()));
if (exitCode.isEmpty() || "AccessDenied".equals(code)) {
exitCode = code;
}
}
if ("AccessDenied".equals(exitCode)) {
return (IOException) new AccessDeniedException(result.toString())
.initCause(ex);
} else {
return new AWSS3IOException(result.toString(), ex);
}
}
/**
* Get low level details of an amazon exception for logging; multi-line.
* @param e exception
@ -534,7 +577,7 @@ public final class S3AUtils {
* @return the list of classes, possibly empty
* @throws IOException on a failure to load the list.
*/
static Class<?>[] loadAWSProviderClasses(Configuration conf,
public static Class<?>[] loadAWSProviderClasses(Configuration conf,
String key,
Class<?>... defaultValue) throws IOException {
try {
@ -564,7 +607,7 @@ public final class S3AUtils {
* @return the instantiated class
* @throws IOException on any instantiation failure.
*/
static AWSCredentialsProvider createAWSCredentialProvider(
public static AWSCredentialsProvider createAWSCredentialProvider(
Configuration conf, Class<?> credClass) throws IOException {
AWSCredentialsProvider credentials;
String className = credClass.getName();
@ -973,14 +1016,18 @@ public final class S3AUtils {
* iterator.
* @param iterator iterator from a list
* @param eval closure to evaluate
* @return the number of files processed
* @throws IOException anything in the closure, or iteration logic.
*/
public static void applyLocatedFiles(
public static long applyLocatedFiles(
RemoteIterator<LocatedFileStatus> iterator,
CallOnLocatedFileStatus eval) throws IOException {
long count = 0;
while (iterator.hasNext()) {
count++;
eval.call(iterator.next());
}
return count;
}
/**

View File

@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
package org.apache.hadoop.fs.s3a.auth;
import java.io.Closeable;
import java.io.IOException;
@ -32,7 +32,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.s3a.AWSCredentialProviderList;
import org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider;
import org.apache.hadoop.security.UserGroupInformation;
import static org.apache.hadoop.fs.s3a.Constants.*;
@ -44,14 +48,18 @@ import static org.apache.hadoop.fs.s3a.S3AUtils.loadAWSProviderClasses;
* {@code STSAssumeRoleSessionCredentialsProvider} from configuration
* properties, including wiring up the inner authenticator, and,
* unless overridden, creating a session name from the current user.
*
* Classname is used in configuration files; do not move.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class AssumedRoleCredentialProvider implements AWSCredentialsProvider,
Closeable {
private static final Logger LOG =
LoggerFactory.getLogger(AssumedRoleCredentialProvider.class);
public static final String NAME
= "org.apache.hadoop.fs.s3a.AssumedRoleCredentialProvider";
= "org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider";
static final String E_FORBIDDEN_PROVIDER =
"AssumedRoleCredentialProvider cannot be in "
@ -103,7 +111,7 @@ public class AssumedRoleCredentialProvider implements AWSCredentialsProvider,
ASSUMED_ROLE_SESSION_DURATION_DEFAULT, TimeUnit.SECONDS);
String policy = conf.getTrimmed(ASSUMED_ROLE_POLICY, "");
LOG.info("{}", this);
LOG.debug("{}", this);
STSAssumeRoleSessionCredentialsProvider.Builder builder
= new STSAssumeRoleSessionCredentialsProvider.Builder(arn, sessionName);
builder.withRoleSessionDurationSeconds((int) duration);

View File

@ -0,0 +1,314 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.auth;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.util.JsonSerialization;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
/**
* Jackson Role Model for Role Properties, for API clients and tests.
*
* Doesn't have complete coverage of the entire AWS IAM policy model;
* don't expect to be able to parse everything.
* It can generate simple models.
* @see <a href="https://docs.aws.amazon.com/AmazonS3/latest/dev/s3-arn-format.html">Example S3 Policies</a>
* @see <a href="https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/api-permissions-reference.html">Dynamno DB Permissions</a>
*/
@InterfaceAudience.LimitedPrivate("Tests")
@InterfaceStability.Unstable
public class RoleModel {
public static final String VERSION = "2012-10-17";
public static final String BUCKET_RESOURCE_F = "arn:aws:s3:::%s/%s";
private static final AtomicLong SID_COUNTER = new AtomicLong(0);
private final JsonSerialization<Policy> serialization =
new JsonSerialization<>(Policy.class, false, true);
public RoleModel() {
ObjectMapper mapper = serialization.getMapper();
mapper.enable(SerializationFeature.WRITE_SINGLE_ELEM_ARRAYS_UNWRAPPED);
}
public String toJson(Policy policy) throws JsonProcessingException {
return serialization.toJson(policy);
}
/**
* Statement ID factory.
* @return a statement ID unique for this JVM's life.
*/
public static String newSid() {
SID_COUNTER.incrementAndGet();
return SID_COUNTER.toString();
}
/**
* Map a bool to an effect.
* @param allowed is the statement to allow actions?
* @return the appropriate effect.
*/
public static Effects effect(final boolean allowed) {
return allowed ? Effects.Allow : Effects.Deny;
}
/**
* Create a resource.
* @param bucket bucket
* @param key key
* @param addWildcard add a * to the tail of the key?
* @return a resource for a statement.
*/
@SuppressWarnings("StringConcatenationMissingWhitespace")
public static String resource(String bucket, String key,
boolean addWildcard) {
return String.format(BUCKET_RESOURCE_F, bucket,
key + (addWildcard ? "*" : ""));
}
/**
* Given a path, return the S3 resource to it.
* If {@code isDirectory} is true, a "/" is added to the path.
* This is critical when adding wildcard permissions under
* a directory, and also needed when locking down dir-as-file
* and dir-as-directory-marker access.
* @param path a path
* @param isDirectory is this a directory?
* @param addWildcard add a * to the tail of the key?
* @return a resource for a statement.
*/
public static String resource(Path path,
final boolean isDirectory,
boolean addWildcard) {
String key = pathToKey(path);
if (isDirectory && !key.isEmpty()) {
key = key + "/";
}
return resource(path.toUri().getHost(), key, addWildcard);
}
/**
* Given a directory path, return the S3 resource to it.
* @param path a path
* @return a resource for a statement.
*/
public static String[] directory(Path path) {
String host = path.toUri().getHost();
String key = pathToKey(path);
if (!key.isEmpty()) {
return new String[] {
resource(host, key + "/", true),
resource(host, key, false),
resource(host, key + "/", false),
};
} else {
return new String[]{
resource(host, key, true),
};
}
}
/**
* Variant of {@link S3AFileSystem#pathToKey(Path)} which doesn't care
* about working directories, so can be static and stateless.
* @param path path to map
* @return key or ""
*/
public static String pathToKey(Path path) {
if (path.toUri().getScheme() != null && path.toUri().getPath().isEmpty()) {
return "";
}
return path.toUri().getPath().substring(1);
}
/**
* Create a statement.
* @param allow allow or deny
* @param scope scope
* @param actions actions
* @return the formatted json statement
*/
public static Statement statement(boolean allow,
String scope,
String... actions) {
return new Statement(RoleModel.effect(allow))
.addActions(actions)
.addResources(scope);
}
/**
* Create a statement.
* If {@code isDirectory} is true, a "/" is added to the path.
* This is critical when adding wildcard permissions under
* a directory, and also needed when locking down dir-as-file
* and dir-as-directory-marker access.
* @param allow allow or deny
* @param path path
* @param isDirectory is this a directory?
* @param actions action
* @return the formatted json statement
*/
public static Statement statement(
final boolean allow,
final Path path,
final boolean isDirectory,
final boolean wildcards,
final String... actions) {
return new Statement(RoleModel.effect(allow))
.addActions(actions)
.addResources(resource(path, isDirectory, wildcards));
}
/**
* From a set of statements, create a policy.
* @param statements statements
* @return the policy
*/
public static Policy policy(Statement... statements) {
return new Policy(statements);
}
/**
* Effect options.
*/
public enum Effects {
Allow,
Deny
}
/**
* Any element in a role.
*/
public static abstract class RoleElt {
protected RoleElt() {
}
/**
* validation operation.
*/
public void validate() {
}
}
/**
* A single statement.
*/
public static class Statement extends RoleElt {
@JsonProperty("Sid")
public String sid = newSid();
/**
* Default effect is Deny; forces callers to switch on Allow.
*/
@JsonProperty("Effect")
public Effects effect;
@JsonProperty("Action")
public List<String> action = new ArrayList<>(1);
@JsonProperty("Resource")
public List<String> resource = new ArrayList<>(1);
public Statement(final Effects effect) {
this.effect = effect;
}
@Override
public void validate() {
checkNotNull(sid, "Sid");
checkNotNull(effect, "Effect");
checkState(!(action.isEmpty()), "Empty Action");
checkState(!(resource.isEmpty()), "Empty Resource");
}
public Statement setAllowed(boolean f) {
effect = effect(f);
return this;
}
public Statement addActions(String... actions) {
Collections.addAll(action, actions);
return this;
}
public Statement addResources(String... resources) {
Collections.addAll(resource, resources);
return this;
}
}
/**
* A policy is one or more statements.
*/
public static class Policy extends RoleElt {
@JsonProperty("Version")
public String version = VERSION;
@JsonProperty("Statement")
public List<Statement> statement;
public Policy(final List<RoleModel.Statement> statement) {
this.statement = statement;
}
public Policy(RoleModel.Statement... statements) {
statement = Arrays.asList(statements);
}
/**
* Validation includes validating all statements.
*/
@Override
public void validate() {
checkNotNull(statement, "Statement");
checkState(VERSION.equals(version), "Invalid Version: %s", version);
statement.stream().forEach((a) -> a.validate());
}
}
}

View File

@ -0,0 +1,228 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.auth;
import static org.apache.hadoop.fs.s3a.auth.RoleModel.*;
/**
* Operations, statements and policies covering the operations
* needed to work with S3 and S3Guard.
*/
public final class RolePolicies {
private RolePolicies() {
}
/**
* All S3 operations: {@value}.
*/
public static final String S3_ALL_OPERATIONS = "s3:*";
/**
* All S3 buckets: {@value}.
*/
public static final String S3_ALL_BUCKETS = "arn:aws:s3:::*";
public static final String S3_ALL_LIST_OPERATIONS = "s3:List*";
public static final String S3_ALL_LIST_BUCKET = "s3:ListBucket*";
public static final String S3_LIST_BUCKET = "s3:ListBucket";
/**
* This is used by the abort operation in S3A commit work.
*/
public static final String S3_LIST_BUCKET_MULTPART_UPLOADS =
"s3:ListBucketMultipartUploads";
/**
* List multipart upload is needed for the S3A Commit protocols.
*/
public static final String S3_LIST_MULTIPART_UPLOAD_PARTS
= "s3:ListMultipartUploadParts";
/**
* abort multipart upload is needed for the S3A Commit protocols.
*/
public static final String S3_ABORT_MULTIPART_UPLOAD
= "s3:AbortMultipartUpload";
/**
* All s3:Delete* operations.
*/
public static final String S3_ALL_DELETE = "s3:Delete*";
public static final String S3_DELETE_OBJECT = "s3:DeleteObject";
public static final String S3_DELETE_OBJECT_TAGGING
= "s3:DeleteObjectTagging";
public static final String S3_DELETE_OBJECT_VERSION
= "s3:DeleteObjectVersion";
public static final String S3_DELETE_OBJECT_VERSION_TAGGING
= "s3:DeleteObjectVersionTagging";
/**
* All s3:Get* operations.
*/
public static final String S3_ALL_GET = "s3:Get*";
public static final String S3_GET_OBJECT = "s3:GetObject";
public static final String S3_GET_OBJECT_ACL = "s3:GetObjectAcl";
public static final String S3_GET_OBJECT_TAGGING = "s3:GetObjectTagging";
public static final String S3_GET_OBJECT_TORRENT = "s3:GetObjectTorrent";
public static final String S3_GET_OBJECT_VERSION = "s3:GetObjectVersion";
public static final String S3_GET_OBJECT_VERSION_ACL
= "s3:GetObjectVersionAcl";
public static final String S3_GET_OBJECT_VERSION_TAGGING
= "s3:GetObjectVersionTagging";
public static final String S3_GET_OBJECT_VERSION_TORRENT
= "s3:GetObjectVersionTorrent";
/**
* S3 Put*.
* This covers single an multipart uploads, but not list/abort of the latter.
*/
public static final String S3_ALL_PUT = "s3:Put*";
public static final String S3_PUT_OBJECT = "s3:PutObject";
public static final String S3_PUT_OBJECT_ACL = "s3:PutObjectAcl";
public static final String S3_PUT_OBJECT_TAGGING = "s3:PutObjectTagging";
public static final String S3_PUT_OBJECT_VERSION_ACL
= "s3:PutObjectVersionAcl";
public static final String S3_PUT_OBJECT_VERSION_TAGGING
= "s3:PutObjectVersionTagging";
public static final String S3_RESTORE_OBJECT = "s3:RestoreObject";
/**
* Actions needed to read data from S3 through S3A.
*/
public static final String[] S3_PATH_READ_OPERATIONS =
new String[]{
S3_GET_OBJECT,
};
/**
* Actions needed to read data from S3 through S3A.
*/
public static final String[] S3_ROOT_READ_OPERATIONS =
new String[]{
S3_LIST_BUCKET,
S3_LIST_BUCKET_MULTPART_UPLOADS,
S3_GET_OBJECT,
};
/**
* Actions needed to write data to an S3A Path.
* This includes the appropriate read operations.
*/
public static final String[] S3_PATH_RW_OPERATIONS =
new String[]{
S3_ALL_GET,
S3_PUT_OBJECT,
S3_DELETE_OBJECT,
S3_ABORT_MULTIPART_UPLOAD,
S3_LIST_MULTIPART_UPLOAD_PARTS,
};
/**
* Actions needed to write data to an S3A Path.
* This is purely the extra operations needed for writing atop
* of the read operation set.
* Deny these and a path is still readable, but not writeable.
*/
public static final String[] S3_PATH_WRITE_OPERATIONS =
new String[]{
S3_PUT_OBJECT,
S3_DELETE_OBJECT,
S3_ABORT_MULTIPART_UPLOAD
};
/**
* Actions needed for R/W IO from the root of a bucket.
*/
public static final String[] S3_ROOT_RW_OPERATIONS =
new String[]{
S3_LIST_BUCKET,
S3_ALL_GET,
S3_PUT_OBJECT,
S3_DELETE_OBJECT,
S3_ABORT_MULTIPART_UPLOAD,
S3_LIST_MULTIPART_UPLOAD_PARTS,
S3_ALL_LIST_BUCKET,
};
/**
* All DynamoDB operations: {@value}.
*/
public static final String DDB_ALL_OPERATIONS = "dynamodb:*";
public static final String DDB_ADMIN = "dynamodb:*";
public static final String DDB_BATCH_WRITE = "dynamodb:BatchWriteItem";
/**
* All DynamoDB tables: {@value}.
*/
public static final String ALL_DDB_TABLES = "arn:aws:dynamodb:::*";
public static final String WILDCARD = "*";
/**
* Allow all S3 Operations.
*/
public static final Statement STATEMENT_ALL_S3 = statement(true,
S3_ALL_BUCKETS,
S3_ALL_OPERATIONS);
/**
* Statement to allow all DDB access.
*/
public static final Statement STATEMENT_ALL_DDB = statement(true,
ALL_DDB_TABLES, DDB_ALL_OPERATIONS);
/**
* Allow all S3 and S3Guard operations.
*/
public static final Policy ALLOW_S3_AND_SGUARD = policy(
STATEMENT_ALL_S3,
STATEMENT_ALL_DDB
);
}

View File

@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Authentication and permissions support.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
package org.apache.hadoop.fs.s3a.auth;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

View File

@ -309,7 +309,7 @@ public class CommitOperations {
} catch (FileNotFoundException e) {
LOG.debug("listed file already deleted: {}", pendingFile);
} catch (IOException | IllegalArgumentException e) {
if (outcome == null) {
if (MaybeIOE.NONE.equals(outcome)) {
outcome = new MaybeIOE(makeIOE(pendingFile.toString(), e));
}
} finally {

View File

@ -26,6 +26,9 @@ to obtain the assumed role and refresh it regularly.
By using per-filesystem configuration, it is possible to use different
assumed roles for different buckets.
*IAM Assumed Roles are unlikely to be supported by third-party systems
supporting the S3 APIs.*
## Using IAM Assumed Roles
### Before You Begin
@ -38,14 +41,13 @@ are, how to configure their policies, etc.
* Have the AWS CLI installed, and test that it works there.
* Give the role access to S3, and, if using S3Guard, to DynamoDB.
Trying to learn how IAM Assumed Roles work by debugging stack traces from
the S3A client is "suboptimal".
### <a name="how_it_works"></a> How the S3A connector support IAM Assumed Roles.
To use assumed roles, the client must be configured to use the
*Assumed Role Credential Provider*, `org.apache.hadoop.fs.s3a.AssumedRoleCredentialProvider`,
*Assumed Role Credential Provider*, `org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider`,
in the configuration option `fs.s3a.aws.credentials.provider`.
This AWS Credential provider will read in the `fs.s3a.assumed.role` options needed to connect to the
@ -54,7 +56,8 @@ first authenticating with the full credentials, then assuming the specific role
specified. It will then refresh this login at the configured rate of
`fs.s3a.assumed.role.session.duration`
To authenticate with the STS service both for the initial credential retrieval
To authenticate with the [AWS STS service](https://docs.aws.amazon.com/STS/latest/APIReference/Welcome.html)
both for the initial credential retrieval
and for background refreshes, a different credential provider must be
created, one which uses long-lived credentials (secret keys, environment variables).
Short lived credentials (e.g other session tokens, EC2 instance credentials) cannot be used.
@ -76,6 +79,7 @@ the previously created ARN.
<property>
<name>fs.s3a.aws.credentials.provider</name>
<value>org.apache.hadoop.fs.s3a.AssumedRoleCredentialProvider</value>
<value>org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider</value>
</property>
<property>
@ -116,7 +120,7 @@ Here are the full set of configuration options.
<value />
<description>
AWS ARN for the role to be assumed.
Requires the fs.s3a.aws.credentials.provider list to contain
Required if the fs.s3a.aws.credentials.provider contains
org.apache.hadoop.fs.s3a.AssumedRoleCredentialProvider
</description>
</property>
@ -127,23 +131,27 @@ Here are the full set of configuration options.
<description>
Session name for the assumed role, must be valid characters according to
the AWS APIs.
Only used if AssumedRoleCredentialProvider is the AWS credential provider.
If not set, one is generated from the current Hadoop/Kerberos username.
</description>
</property>
<property>
<name>fs.s3a.assumed.role.policy</name>
<value/>
<description>
JSON policy to apply to the role.
Only used if AssumedRoleCredentialProvider is the AWS credential provider.
</description>
</property>
<property>
<name>fs.s3a.assumed.role.session.duration</name>
<value>30m</value>
<description>
Duration of assumed roles before a refresh is attempted.
</description>
</property>
<property>
<name>fs.s3a.assumed.role.policy</name>
<value/>
<description>
Extra policy containing more restrictions to apply to the role.
Only used if AssumedRoleCredentialProvider is the AWS credential provider.
Range: 15m to 1h
</description>
</property>
@ -152,37 +160,173 @@ Here are the full set of configuration options.
<value/>
<description>
AWS Simple Token Service Endpoint. If unset, uses the default endpoint.
Only used if AssumedRoleCredentialProvider is the AWS credential provider.
</description>
</property>
<property>
<name>fs.s3a.assumed.role.credentials.provider</name>
<value/>
<value>org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider</value>
<description>
Credential providers used to authenticate with the STS endpoint and retrieve
the role tokens.
List of credential providers to authenticate with the STS endpoint and
retrieve short-lived role credentials.
Only used if AssumedRoleCredentialProvider is the AWS credential provider.
If unset, uses "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider".
</description>
</property>
```
## <a name="polices"></a> Restricting S3A operations through AWS Policies
The S3A client needs to be granted specific permissions in order
to work with a bucket.
Here is a non-normative list of the permissions which must be granted
for FileSystem operations to work.
*Disclaimer* The specific set of actions which the S3A connector needs
will change over time.
As more operations are added to the S3A connector, and as the
means by which existing operations are implemented change, the
AWS actions which are required by the client will change.
These lists represent the minimum actions to which the client's principal
must have in order to work with a bucket.
### Read Access Permissions
Permissions which must be granted when reading from a bucket:
| Action | S3A operations |
|--------|----------|
| `s3:ListBucket` | `listStatus()`, `getFileStatus()` and elsewhere |
| `s3:GetObject` | `getFileStatus()`, `open()` and elsewhere |
| `s3:ListBucketMultipartUploads` | Aborting/cleaning up S3A commit operations|
The `s3:ListBucketMultipartUploads` is only needed when committing work
via the [S3A committers](committers.html).
However, it must be granted to the root path in order to safely clean up jobs.
It is simplest to permit this in all buckets, even if it is only actually
needed when writing data.
### Write Access Permissions
These permissions must *also* be granted for write access:
| Action | S3A operations |
|--------|----------|
| `s3:PutObject` | `mkdir()`, `create()`, `rename()`, `delete()` |
| `s3:DeleteObject` | `mkdir()`, `create()`, `rename()`, `delete()` |
| `s3:AbortMultipartUpload` | S3A committer `abortJob()` and `cleanup()` operations |
| `s3:ListMultipartUploadParts` | S3A committer `abortJob()` and `cleanup()` operations |
### Mixed Permissions in a single S3 Bucket
Mixing permissions down the "directory tree" is limited
only to the extent of supporting writeable directories under
read-only parent paths.
*Disclaimer:* When a client lacks write access up the entire
directory tree, there are no guarantees of consistent filesystem
views or operations.
Particular troublespots are "directory markers" and
failures of non-atomic operations, particularly `rename()` and `delete()`.
A directory marker such as `/users/` will not be deleted if the user `alice`
creates a directory `/users/alice` *and* she only has access to `/users/alice`.
When a path or directory is deleted, the parent directory may not exist afterwards.
In the example above, if `alice` deletes `/users/alice` and there are no
other entries under `/users/alice`, then the directory marker `/users/` cannot
be created. The directory `/users` will not exist in listings,
`getFileStatus("/users")` or similar.
Rename will fail if it cannot delete the items it has just copied, that is
`rename(read-only-source, writeable-dest)` will fail &mdash;but only after
performing the COPY of the data.
Even though the operation failed, for a single file copy, the destination
file will exist.
For a directory copy, only a partial copy of the source data may take place
before the permission failure is raised.
*S3Guard*: if [S3Guard](s3guard.html) is used to manage the directory listings,
then after partial failures of rename/copy the DynamoDB tables can get out of sync.
### Example: Read access to the base, R/W to the path underneath
This example has the base bucket read only, and a directory underneath,
`/users/alice/` granted full R/W access.
```json
{
"Version" : "2012-10-17",
"Statement" : [ {
"Sid" : "4",
"Effect" : "Allow",
"Action" : [
"s3:ListBucket",
"s3:ListBucketMultipartUploads",
"s3:GetObject"
],
"Resource" : "arn:aws:s3:::example-bucket/*"
}, {
"Sid" : "5",
"Effect" : "Allow",
"Action" : [
"s3:Get*",
"s3:PutObject",
"s3:DeleteObject",
"s3:AbortMultipartUpload",
"s3:ListMultipartUploadParts" ],
"Resource" : [
"arn:aws:s3:::example-bucket/users/alice/*",
"arn:aws:s3:::example-bucket/users/alice",
"arn:aws:s3:::example-bucket/users/alice/"
]
} ]
}
```
Note how three resources are provided to represent the path `/users/alice`
| Path | Matches |
|-------|----------|
| `/users/alice` | Any file `alice` created under `/users` |
| `/users/alice/` | The directory marker `alice/` created under `/users` |
| `/users/alice/*` | All files and directories under the path `/users/alice` |
Note that the resource `arn:aws:s3:::example-bucket/users/alice*` cannot
be used to refer to all of these paths, because it would also cover
adjacent paths like `/users/alice2` and `/users/alicebob`.
## <a name="troubleshooting"></a> Troubleshooting Assumed Roles
1. Make sure the role works and the user trying to enter it can do so from AWS
the command line before trying to use the S3A client.
1. Try to access the S3 bucket with reads and writes from the AWS CLI.
1. Then, with the hadoop settings updated, try to read data from the `hadoop fs` CLI:
1. With the Hadoop configuration set too use the role,
try to read data from the `hadoop fs` CLI:
`hadoop fs -ls -p s3a://bucket/`
1. Then, with the hadoop CLI, try to create a new directory with a request such as
1. With the hadoop CLI, try to create a new directory with a request such as
`hadoop fs -mkdirs -p s3a://bucket/path/p1/`
### <a name="no_role"></a>IOException: "Unset property fs.s3a.assumed.role.arn"
The Assumed Role Credential Provider is enabled, but `fs.s3a.assumed.role.arn` is unset.
```
java.io.IOException: Unset property fs.s3a.assumed.role.arn
at org.apache.hadoop.fs.s3a.AssumedRoleCredentialProvider.<init>(AssumedRoleCredentialProvider.java:76)
at org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider.<init>(AssumedRoleCredentialProvider.java:76)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
@ -201,7 +345,7 @@ This can arise if the role ARN set in `fs.s3a.assumed.role.arn` is invalid
or one to which the caller has no access.
```
java.nio.file.AccessDeniedException: : Instantiate org.apache.hadoop.fs.s3a.AssumedRoleCredentialProvider
java.nio.file.AccessDeniedException: : Instantiate org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider
on : com.amazonaws.services.securitytoken.model.AWSSecurityTokenServiceException:
Not authorized to perform sts:AssumeRole (Service: AWSSecurityTokenService; Status Code: 403;
Error Code: AccessDenied; Request ID: aad4e59a-f4b0-11e7-8c78-f36aaa9457f6):AccessDenied
@ -217,12 +361,12 @@ java.nio.file.AccessDeniedException: : Instantiate org.apache.hadoop.fs.s3a.Assu
### <a name="root_account"></a> "Roles may not be assumed by root accounts"
You can't use assume a role with the root acount of an AWS account;
You can't assume a role with the root account of an AWS account;
you need to create a new user and give it the permission to change into
the role.
```
java.nio.file.AccessDeniedException: : Instantiate org.apache.hadoop.fs.s3a.AssumedRoleCredentialProvider
java.nio.file.AccessDeniedException: : Instantiate org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider
on : com.amazonaws.services.securitytoken.model.AWSSecurityTokenServiceException:
Roles may not be assumed by root accounts. (Service: AWSSecurityTokenService; Status Code: 403; Error Code: AccessDenied;
Request ID: e86dfd8f-e758-11e7-88e7-ad127c04b5e2):
@ -257,7 +401,7 @@ The value of `fs.s3a.assumed.role.session.duration` is out of range.
```
java.lang.IllegalArgumentException: Assume Role session duration should be in the range of 15min - 1Hr
at com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider$Builder.withRoleSessionDurationSeconds(STSAssumeRoleSessionCredentialsProvider.java:437)
at org.apache.hadoop.fs.s3a.AssumedRoleCredentialProvider.<init>(AssumedRoleCredentialProvider.java:86)
at org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider.<init>(AssumedRoleCredentialProvider.java:86)
```
@ -268,7 +412,7 @@ The policy set in `fs.s3a.assumed.role.policy` is not valid according to the
AWS specification of Role Policies.
```
rg.apache.hadoop.fs.s3a.AWSBadRequestException: Instantiate org.apache.hadoop.fs.s3a.AssumedRoleCredentialProvider on :
rg.apache.hadoop.fs.s3a.AWSBadRequestException: Instantiate org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider on :
com.amazonaws.services.securitytoken.model.MalformedPolicyDocumentException:
The policy is not in the valid JSON format. (Service: AWSSecurityTokenService; Status Code: 400;
Error Code: MalformedPolicyDocument; Request ID: baf8cb62-f552-11e7-9768-9df3b384e40c):
@ -308,8 +452,8 @@ Caused by: com.amazonaws.services.securitytoken.model.MalformedPolicyDocumentExc
at com.amazonaws.auth.RefreshableTask.blockingRefresh(RefreshableTask.java:212)
at com.amazonaws.auth.RefreshableTask.getValue(RefreshableTask.java:153)
at com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider.getCredentials(STSAssumeRoleSessionCredentialsProvider.java:299)
at org.apache.hadoop.fs.s3a.AssumedRoleCredentialProvider.getCredentials(AssumedRoleCredentialProvider.java:127)
at org.apache.hadoop.fs.s3a.AssumedRoleCredentialProvider.<init>(AssumedRoleCredentialProvider.java:116)
at org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider.getCredentials(AssumedRoleCredentialProvider.java:127)
at org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider.<init>(AssumedRoleCredentialProvider.java:116)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
@ -324,7 +468,7 @@ The policy set in `fs.s3a.assumed.role.policy` is not valid JSON.
```
org.apache.hadoop.fs.s3a.AWSBadRequestException:
Instantiate org.apache.hadoop.fs.s3a.AssumedRoleCredentialProvider on :
Instantiate org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider on :
com.amazonaws.services.securitytoken.model.MalformedPolicyDocumentException:
Syntax errors in policy. (Service: AWSSecurityTokenService;
Status Code: 400; Error Code: MalformedPolicyDocument;
@ -363,8 +507,8 @@ Instantiate org.apache.hadoop.fs.s3a.AssumedRoleCredentialProvider on :
at com.amazonaws.auth.RefreshableTask.blockingRefresh(RefreshableTask.java:212)
at com.amazonaws.auth.RefreshableTask.getValue(RefreshableTask.java:153)
at com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider.getCredentials(STSAssumeRoleSessionCredentialsProvider.java:299)
at org.apache.hadoop.fs.s3a.AssumedRoleCredentialProvider.getCredentials(AssumedRoleCredentialProvider.java:127)
at org.apache.hadoop.fs.s3a.AssumedRoleCredentialProvider.<init>(AssumedRoleCredentialProvider.java:116)
at org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider.getCredentials(AssumedRoleCredentialProvider.java:127)
at org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider.<init>(AssumedRoleCredentialProvider.java:116)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
@ -380,7 +524,7 @@ You can't use the Assumed Role Credential Provider as the provider in
```
java.io.IOException: AssumedRoleCredentialProvider cannot be in fs.s3a.assumed.role.credentials.provider
at org.apache.hadoop.fs.s3a.AssumedRoleCredentialProvider.<init>(AssumedRoleCredentialProvider.java:86)
at org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider.<init>(AssumedRoleCredentialProvider.java:86)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
@ -401,7 +545,7 @@ There's an space or other typo in the `fs.s3a.access.key` or `fs.s3a.secret.key`
inner authentication which is breaking signature creation.
```
org.apache.hadoop.fs.s3a.AWSBadRequestException: Instantiate org.apache.hadoop.fs.s3a.AssumedRoleCredentialProvider
org.apache.hadoop.fs.s3a.AWSBadRequestException: Instantiate org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider
on : com.amazonaws.services.securitytoken.model.AWSSecurityTokenServiceException:
'valid/20180109/us-east-1/sts/aws4_request' not a valid key=value pair (missing equal-sign) in Authorization header:
'AWS4-HMAC-SHA256 Credential=not valid/20180109/us-east-1/sts/aws4_request,
@ -447,8 +591,8 @@ Caused by: com.amazonaws.services.securitytoken.model.AWSSecurityTokenServiceExc
at com.amazonaws.auth.RefreshableTask.blockingRefresh(RefreshableTask.java:212)
at com.amazonaws.auth.RefreshableTask.getValue(RefreshableTask.java:153)
at com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider.getCredentials(STSAssumeRoleSessionCredentialsProvider.java:299)
at org.apache.hadoop.fs.s3a.AssumedRoleCredentialProvider.getCredentials(AssumedRoleCredentialProvider.java:127)
at org.apache.hadoop.fs.s3a.AssumedRoleCredentialProvider.<init>(AssumedRoleCredentialProvider.java:116)
at org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider.getCredentials(AssumedRoleCredentialProvider.java:127)
at org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider.<init>(AssumedRoleCredentialProvider.java:116)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
@ -463,7 +607,7 @@ The credentials used to authenticate with the AWS Simple Token Service are inval
```
[ERROR] Failures:
[ERROR] java.nio.file.AccessDeniedException: : Instantiate org.apache.hadoop.fs.s3a.AssumedRoleCredentialProvider on :
[ERROR] java.nio.file.AccessDeniedException: : Instantiate org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider on :
com.amazonaws.services.securitytoken.model.AWSSecurityTokenServiceException:
The security token included in the request is invalid.
(Service: AWSSecurityTokenService; Status Code: 403; Error Code: InvalidClientTokenId;
@ -501,8 +645,8 @@ The security token included in the request is invalid.
at com.amazonaws.auth.RefreshableTask.blockingRefresh(RefreshableTask.java:212)
at com.amazonaws.auth.RefreshableTask.getValue(RefreshableTask.java:153)
at com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider.getCredentials(STSAssumeRoleSessionCredentialsProvider.java:299)
at org.apache.hadoop.fs.s3a.AssumedRoleCredentialProvider.getCredentials(AssumedRoleCredentialProvider.java:127)
at org.apache.hadoop.fs.s3a.AssumedRoleCredentialProvider.<init>(AssumedRoleCredentialProvider.java:116)
at org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider.getCredentials(AssumedRoleCredentialProvider.java:127)
at org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider.<init>(AssumedRoleCredentialProvider.java:116)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
@ -521,7 +665,7 @@ match these constraints.
If set explicitly, it must be valid.
```
org.apache.hadoop.fs.s3a.AWSBadRequestException: Instantiate org.apache.hadoop.fs.s3a.AssumedRoleCredentialProvider on
org.apache.hadoop.fs.s3a.AWSBadRequestException: Instantiate org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider on
com.amazonaws.services.securitytoken.model.AWSSecurityTokenServiceException:
1 validation error detected: Value 'Session Names cannot Hava Spaces!' at 'roleSessionName'
failed to satisfy constraint: Member must satisfy regular expression pattern: [\w+=,.@-]*
@ -584,8 +728,8 @@ Caused by: com.amazonaws.services.securitytoken.model.AWSSecurityTokenServiceExc
at com.amazonaws.auth.RefreshableTask.blockingRefresh(RefreshableTask.java:212)
at com.amazonaws.auth.RefreshableTask.getValue(RefreshableTask.java:153)
at com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider.getCredentials(STSAssumeRoleSessionCredentialsProvider.java:299)
at org.apache.hadoop.fs.s3a.AssumedRoleCredentialProvider.getCredentials(AssumedRoleCredentialProvider.java:135)
at org.apache.hadoop.fs.s3a.AssumedRoleCredentialProvider.<init>(AssumedRoleCredentialProvider.java:124)
at org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider.getCredentials(AssumedRoleCredentialProvider.java:135)
at org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider.<init>(AssumedRoleCredentialProvider.java:124)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
@ -593,3 +737,61 @@ Caused by: com.amazonaws.services.securitytoken.model.AWSSecurityTokenServiceExc
at org.apache.hadoop.fs.s3a.S3AUtils.createAWSCredentialProvider(S3AUtils.java:583)
... 26 more
```
### <a name="access_denied"></a> `java.nio.file.AccessDeniedException` within a FileSystem API call
If an operation fails with an `AccessDeniedException`, then the role does not have
the permission for the S3 Operation invoked during the call
```
java.nio.file.AccessDeniedException: s3a://bucket/readonlyDir: rename(s3a://bucket/readonlyDir, s3a://bucket/renameDest)
on s3a://bucket/readonlyDir:
com.amazonaws.services.s3.model.AmazonS3Exception: Access Denied
(Service: Amazon S3; Status Code: 403; Error Code: AccessDenied; Request ID: 2805F2ABF5246BB1;
S3 Extended Request ID: iEXDVzjIyRbnkAc40MS8Sjv+uUQNvERRcqLsJsy9B0oyrjHLdkRKwJ/phFfA17Kjn483KSlyJNw=),
S3 Extended Request ID: iEXDVzjIyRbnkAc40MS8Sjv+uUQNvERRcqLsJsy9B0oyrjHLdkRKwJ/phFfA17Kjn483KSlyJNw=:AccessDenied
at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:216)
at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:143)
at org.apache.hadoop.fs.s3a.S3AFileSystem.rename(S3AFileSystem.java:853)
...
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Access Denied
(Service: Amazon S3; Status Code: 403; Error Code: AccessDenied; Request ID: 2805F2ABF5246BB1;
S3 Extended Request ID: iEXDVzjIyRbnkAc40MS8Sjv+uUQNvERRcqLsJsy9B0oyrjHLdkRKwJ/phFfA17Kjn483KSlyJNw=),
S3 Extended Request ID: iEXDVzjIyRbnkAc40MS8Sjv+uUQNvERRcqLsJsy9B0oyrjHLdkRKwJ/phFfA17Kjn483KSlyJNw=
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1638)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1303)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1055)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4229)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4176)
at com.amazonaws.services.s3.AmazonS3Client.deleteObject(AmazonS3Client.java:2066)
at com.amazonaws.services.s3.AmazonS3Client.deleteObject(AmazonS3Client.java:2052)
at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$deleteObject$7(S3AFileSystem.java:1338)
at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:314)
at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:280)
at org.apache.hadoop.fs.s3a.S3AFileSystem.deleteObject(S3AFileSystem.java:1334)
at org.apache.hadoop.fs.s3a.S3AFileSystem.removeKeys(S3AFileSystem.java:1657)
at org.apache.hadoop.fs.s3a.S3AFileSystem.innerRename(S3AFileSystem.java:1046)
at org.apache.hadoop.fs.s3a.S3AFileSystem.rename(S3AFileSystem.java:851)
```
This is the policy restriction behaving as intended: the caller is trying to
perform an action which is forbidden.
1. If a policy has been set in `fs.s3a.assumed.role.policy` then it must declare *all*
permissions which the caller is allowed to perform. The existing role policies
act as an outer constraint on what the caller can perform, but are not inherited.
1. If the policy for a bucket is set up with complex rules on different paths,
check the path for the operation.
1. The policy may have omitted one or more actions which are required.
Make sure that all the read and write permissions are allowed for any bucket/path
to which data is being written to, and read permissions for all
buckets read from.

View File

@ -1043,24 +1043,25 @@ If this role is not set, the tests which require it will be skipped.
To run the tests in `ITestAssumeRole`, you need:
1. A role in your AWS account with the relevant access rights to
the S3 buckets used in the tests, and ideally DynamoDB, for S3Guard.
1. A role in your AWS account will full read and write access rights to
the S3 bucket used in the tests, and ideally DynamoDB, for S3Guard.
If your bucket is set up by default to use S3Guard, the role must have access
to that service.
1. Your IAM User to have the permissions to adopt that role.
1. Your IAM User to have the permissions to adopt that role.
1. The role ARN must be set in `fs.s3a.assumed.role.arn`.
```xml
<property>
<name>fs.s3a.assumed.role.arn</name>
<value>arn:aws:kms:eu-west-1:00000000000:key/0000000-16c9-4832-a1a9-c8bbef25ec8b</value>
<value>arn:aws:iam::9878543210123:role/role-s3-restricted</value>
</property>
```
The tests don't do much other than verify that basic file IO works with the role,
and trigger various failures.
The tests assume the role with different subsets of permissions and verify
that the S3A client (mostly) works when the caller has only write access
to part of the directory tree.
You can also run the entire test suite in an assumed role, a more
thorough test, by switching to the credentials provider.
@ -1068,7 +1069,7 @@ thorough test, by switching to the credentials provider.
```xml
<property>
<name>fs.s3a.aws.credentials.provider</name>
<value>org.apache.hadoop.fs.s3a.AssumedRoleCredentialProvider</value>
<value>org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider</value>
</property>
```

View File

@ -1,52 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.contract.s3a;
import org.apache.hadoop.fs.s3a.AssumedRoleCredentialProvider;
import static org.apache.hadoop.fs.s3a.Constants.ASSUMED_ROLE_ARN;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.authenticationContains;
/**
* Run DistCP under an assumed role.
* This is skipped if the FS is already set to run under an assumed role,
* because it would duplicate that of the superclass.
*/
public class ITestS3AContractDistCpAssumedRole extends ITestS3AContractDistCp {
@Override
public void setup() throws Exception {
super.setup();
// check for the fs having assumed roles
assume("No ARN for role tests", !getAssumedRoleARN().isEmpty());
assume("Already running as an assumed role",
!authenticationContains(getFileSystem().getConf(),
AssumedRoleCredentialProvider.NAME));
}
/**
* Probe for an ARN for the test FS.
* @return any ARN for the (previous created) filesystem.
*/
private String getAssumedRoleARN() {
return getFileSystem().getConf().getTrimmed(ASSUMED_ROLE_ARN, "");
}
}

View File

@ -1,324 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import java.io.Closeable;
import java.io.IOException;
import java.nio.file.AccessDeniedException;
import java.util.concurrent.Callable;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.services.securitytoken.model.AWSSecurityTokenServiceException;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
/**
* Tests use of assumed roles.
* Only run if an assumed role is provided.
*/
public class ITestAssumeRole extends AbstractS3ATestBase {
private static final Logger LOG =
LoggerFactory.getLogger(ITestAssumeRole.class);
private static final String ARN_EXAMPLE
= "arn:aws:kms:eu-west-1:00000000000:key/" +
"0000000-16c9-4832-a1a9-c8bbef25ec8b";
private static final String E_BAD_ROLE
= "Not authorized to perform sts:AssumeRole";
/**
* This is AWS policy removes read access.
*/
public static final String RESTRICTED_POLICY = "{\n"
+ " \"Version\": \"2012-10-17\",\n"
+ " \"Statement\": [{\n"
+ " \"Effect\": \"Deny\",\n"
+ " \"Action\": \"s3:ListObjects\",\n"
+ " \"Resource\": \"*\"\n"
+ " }\n"
+ " ]\n"
+ "}";
private void assumeRoleTests() {
assume("No ARN for role tests", !getAssumedRoleARN().isEmpty());
}
private String getAssumedRoleARN() {
return getContract().getConf().getTrimmed(ASSUMED_ROLE_ARN, "");
}
/**
* Expect a filesystem to fail to instantiate.
* @param conf config to use
* @param clazz class of exception to expect
* @param text text in exception
* @param <E> type of exception as inferred from clazz
* @throws Exception if the exception was the wrong class
*/
private <E extends Throwable> void expectFileSystemFailure(
Configuration conf,
Class<E> clazz,
String text) throws Exception {
interceptC(clazz,
text,
() -> new Path(getFileSystem().getUri()).getFileSystem(conf));
}
/**
* Experimental variant of intercept() which closes any Closeable
* returned.
*/
private static <E extends Throwable> E interceptC(
Class<E> clazz, String text,
Callable<Closeable> eval)
throws Exception {
return intercept(clazz, text,
() -> {
try (Closeable c = eval.call()) {
return c.toString();
}
});
}
@Test
public void testCreateCredentialProvider() throws IOException {
assumeRoleTests();
describe("Create the credential provider");
String roleARN = getAssumedRoleARN();
Configuration conf = new Configuration(getContract().getConf());
conf.set(AWS_CREDENTIALS_PROVIDER, AssumedRoleCredentialProvider.NAME);
conf.set(ASSUMED_ROLE_ARN, roleARN);
conf.set(ASSUMED_ROLE_SESSION_NAME, "valid");
conf.set(ASSUMED_ROLE_SESSION_DURATION, "45m");
conf.set(ASSUMED_ROLE_POLICY, RESTRICTED_POLICY);
try (AssumedRoleCredentialProvider provider
= new AssumedRoleCredentialProvider(conf)) {
LOG.info("Provider is {}", provider);
AWSCredentials credentials = provider.getCredentials();
assertNotNull("Null credentials from " + provider, credentials);
}
}
@Test
public void testAssumeRoleCreateFS() throws IOException {
assumeRoleTests();
describe("Create an FS client with the role and do some basic IO");
String roleARN = getAssumedRoleARN();
Configuration conf = createAssumedRoleConfig(roleARN);
conf.set(ASSUMED_ROLE_SESSION_NAME, "valid");
conf.set(ASSUMED_ROLE_SESSION_DURATION, "45m");
Path path = new Path(getFileSystem().getUri());
LOG.info("Creating test FS and user {} with assumed role {}",
conf.get(ACCESS_KEY), roleARN);
try (FileSystem fs = path.getFileSystem(conf)) {
fs.getFileStatus(new Path("/"));
fs.mkdirs(path("testAssumeRoleFS"));
}
}
@Test
public void testAssumeRoleRestrictedPolicyFS() throws Exception {
assumeRoleTests();
describe("Restrict the policy for this session; verify that reads fail");
String roleARN = getAssumedRoleARN();
Configuration conf = createAssumedRoleConfig(roleARN);
conf.set(ASSUMED_ROLE_POLICY, RESTRICTED_POLICY);
Path path = new Path(getFileSystem().getUri());
try (FileSystem fs = path.getFileSystem(conf)) {
intercept(AccessDeniedException.class, "getFileStatus",
() -> fs.getFileStatus(new Path("/")));
intercept(AccessDeniedException.class, "getFileStatus",
() -> fs.listStatus(new Path("/")));
intercept(AccessDeniedException.class, "getFileStatus",
() -> fs.mkdirs(path("testAssumeRoleFS")));
}
}
@Test
public void testAssumeRoleFSBadARN() throws Exception {
assumeRoleTests();
describe("Attemnpt to create the FS with an invalid ARN");
Configuration conf = createAssumedRoleConfig(getAssumedRoleARN());
conf.set(ASSUMED_ROLE_ARN, ARN_EXAMPLE);
expectFileSystemFailure(conf, AccessDeniedException.class, E_BAD_ROLE);
}
@Test
public void testAssumeRoleNoARN() throws Exception {
assumeRoleTests();
describe("Attemnpt to create the FS with no ARN");
Configuration conf = createAssumedRoleConfig(getAssumedRoleARN());
conf.unset(ASSUMED_ROLE_ARN);
expectFileSystemFailure(conf,
IOException.class,
AssumedRoleCredentialProvider.E_NO_ROLE);
}
@Test
public void testAssumeRoleFSBadPolicy() throws Exception {
assumeRoleTests();
describe("Attemnpt to create the FS with malformed JSON");
Configuration conf = createAssumedRoleConfig(getAssumedRoleARN());
// add some malformed JSON
conf.set(ASSUMED_ROLE_POLICY, "}");
expectFileSystemFailure(conf,
AWSBadRequestException.class,
"JSON");
}
@Test
public void testAssumeRoleFSBadPolicy2() throws Exception {
assumeRoleTests();
describe("Attemnpt to create the FS with valid but non-compliant JSON");
Configuration conf = createAssumedRoleConfig(getAssumedRoleARN());
// add some invalid JSON
conf.set(ASSUMED_ROLE_POLICY, "{'json':'but not what AWS wants}");
expectFileSystemFailure(conf,
AWSBadRequestException.class,
"Syntax errors in policy");
}
@Test
public void testAssumeRoleCannotAuthAssumedRole() throws Exception {
assumeRoleTests();
describe("Assert that you can't use assumed roles to auth assumed roles");
Configuration conf = createAssumedRoleConfig(getAssumedRoleARN());
conf.set(ASSUMED_ROLE_CREDENTIALS_PROVIDER,
AssumedRoleCredentialProvider.NAME);
expectFileSystemFailure(conf,
IOException.class,
AssumedRoleCredentialProvider.E_FORBIDDEN_PROVIDER);
}
@Test
public void testAssumeRoleBadInnerAuth() throws Exception {
assumeRoleTests();
describe("Try to authenticate with a keypair with spaces");
Configuration conf = createAssumedRoleConfig(getAssumedRoleARN());
conf.set(ASSUMED_ROLE_CREDENTIALS_PROVIDER,
SimpleAWSCredentialsProvider.NAME);
conf.set(ACCESS_KEY, "not valid");
conf.set(SECRET_KEY, "not secret");
expectFileSystemFailure(conf, AWSBadRequestException.class, "not a valid " +
"key=value pair (missing equal-sign) in Authorization header");
}
@Test
public void testAssumeRoleBadInnerAuth2() throws Exception {
assumeRoleTests();
describe("Try to authenticate with an invalid keypair");
Configuration conf = createAssumedRoleConfig(getAssumedRoleARN());
conf.set(ASSUMED_ROLE_CREDENTIALS_PROVIDER,
SimpleAWSCredentialsProvider.NAME);
conf.set(ACCESS_KEY, "notvalid");
conf.set(SECRET_KEY, "notsecret");
expectFileSystemFailure(conf, AccessDeniedException.class,
"The security token included in the request is invalid");
}
@Test
public void testAssumeRoleBadSession() throws Exception {
assumeRoleTests();
describe("Try to authenticate with an invalid session");
Configuration conf = createAssumedRoleConfig(getAssumedRoleARN());
conf.set(ASSUMED_ROLE_SESSION_NAME,
"Session Names cannot Hava Spaces!");
expectFileSystemFailure(conf, AWSBadRequestException.class,
"Member must satisfy regular expression pattern");
}
/**
* Create a config for an assumed role; it also disables FS caching.
* @param roleARN ARN of role
* @return the configuration
*/
private Configuration createAssumedRoleConfig(String roleARN) {
Configuration conf = new Configuration(getContract().getConf());
conf.set(AWS_CREDENTIALS_PROVIDER, AssumedRoleCredentialProvider.NAME);
conf.set(ASSUMED_ROLE_ARN, roleARN);
disableFilesystemCaching(conf);
return conf;
}
@Test
public void testAssumedRoleCredentialProviderValidation() throws Throwable {
Configuration conf = new Configuration();
conf.set(ASSUMED_ROLE_ARN, "");
interceptC(IOException.class,
AssumedRoleCredentialProvider.E_NO_ROLE,
() -> new AssumedRoleCredentialProvider(conf));
}
@Test
public void testAssumedDuration() throws Throwable {
assumeRoleTests();
describe("Expect the constructor to fail if the session is to short");
Configuration conf = new Configuration();
conf.set(ASSUMED_ROLE_SESSION_DURATION, "30s");
interceptC(IllegalArgumentException.class, "",
() -> new AssumedRoleCredentialProvider(conf));
}
@Test
public void testAssumedInvalidRole() throws Throwable {
assumeRoleTests();
describe("Expect the constructor to fail if the role is invalid");
Configuration conf = new Configuration();
conf.set(ASSUMED_ROLE_ARN, ARN_EXAMPLE);
interceptC(AWSSecurityTokenServiceException.class,
E_BAD_ROLE,
() -> new AssumedRoleCredentialProvider(conf));
}
/**
* This is here to check up on the S3ATestUtils probes themselves.
* @see S3ATestUtils#authenticationContains(Configuration, String).
*/
@Test
public void testauthenticationContainsProbes() {
Configuration conf = new Configuration(false);
assertFalse("found AssumedRoleCredentialProvider",
authenticationContains(conf, AssumedRoleCredentialProvider.NAME));
conf.set(AWS_CREDENTIALS_PROVIDER, AssumedRoleCredentialProvider.NAME);
assertTrue("didn't find AssumedRoleCredentialProvider",
authenticationContains(conf, AssumedRoleCredentialProvider.NAME));
}
}

View File

@ -19,6 +19,8 @@
package org.apache.hadoop.fs.s3a;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FSDataInputStream;
@ -39,23 +41,28 @@ import org.junit.internal.AssumptionViolatedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.List;
import java.util.concurrent.Callable;
import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
import static org.apache.hadoop.fs.s3a.InconsistentAmazonS3Client.*;
import static org.apache.hadoop.fs.s3a.S3ATestConstants.*;
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.S3AUtils.propagateBucketOptions;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.junit.Assert.*;
/**
* Utilities for the S3A tests.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public final class S3ATestUtils {
private static final Logger LOG = LoggerFactory.getLogger(
S3ATestUtils.class);
@ -455,6 +462,33 @@ public final class S3ATestUtils {
reset(metrics);
}
/**
* Variant of {@code LambdaTestUtils#intercept() which closes the Closeable
* returned by the invoked operation, and using its toString() value
* for exception messages.
* @param clazz class of exception; the raised exception must be this class
* <i>or a subclass</i>.
* @param contained string which must be in the {@code toString()} value
* of the exception
* @param eval expression to eval
* @param <T> return type of expression
* @param <E> exception class
* @return the caught exception if it was of the expected type and contents
*/
public static <E extends Throwable, T extends Closeable> E interceptClosing(
Class<E> clazz,
String contained,
Callable<T> eval)
throws Exception {
return intercept(clazz, contained,
() -> {
try (Closeable c = eval.call()) {
return c.toString();
}
});
}
/**
* Helper class to do diffs of metrics.
*/
@ -762,21 +796,23 @@ public final class S3ATestUtils {
}
/**
* List a directory.
* List a directory/directory tree.
* @param fileSystem FS
* @param path path
* @param recursive do a recursive listing?
* @return the number of files found.
* @throws IOException failure.
*/
public static void lsR(FileSystem fileSystem, Path path, boolean recursive)
public static long lsR(FileSystem fileSystem, Path path, boolean recursive)
throws Exception {
if (path == null) {
// surfaces when someone calls getParent() on something at the top
// of the path
LOG.info("Empty path");
return;
return 0;
}
S3AUtils.applyLocatedFiles(fileSystem.listFiles(path, recursive),
(status) -> LOG.info(" {}", status));
return S3AUtils.applyLocatedFiles(fileSystem.listFiles(path, recursive),
(status) -> LOG.info("{}", status));
}
/**

View File

@ -18,12 +18,6 @@
package org.apache.hadoop.fs.s3a;
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.S3ATestConstants.*;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
import static org.apache.hadoop.fs.s3a.S3AUtils.*;
import static org.junit.Assert.*;
import java.io.IOException;
import java.net.URI;
import java.util.Arrays;
@ -33,14 +27,20 @@ import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
import com.amazonaws.auth.InstanceProfileCredentialsProvider;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider;
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.S3ATestConstants.*;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
import static org.apache.hadoop.fs.s3a.S3AUtils.*;
import static org.junit.Assert.*;
/**
* Unit tests for {@link Constants#AWS_CREDENTIALS_PROVIDER} logic.
*/
@ -248,10 +248,10 @@ public class TestS3AAWSCredentialsProvider {
AWSCredentialsProvider provider = providers.get(i);
assertNotNull(
String.format("At position %d, expected class is %s, but found null.",
i, expectedClass), provider);
i, expectedClass), provider);
assertTrue(
String.format("At position %d, expected class is %s, but found %s.",
i, expectedClass, provider.getClass()),
i, expectedClass, provider.getClass()),
expectedClass.isAssignableFrom(provider.getClass()));
}
}
@ -269,7 +269,23 @@ public class TestS3AAWSCredentialsProvider {
assertNotNull(provider2);
assertInstanceOf(InstanceProfileCredentialsProvider.class, provider2);
assertSame("Expected all usage of InstanceProfileCredentialsProvider to "
+ "share a singleton instance, but found unique instances.",
+ "share a singleton instance, but found unique instances.",
provider1, provider2);
}
/**
* This is here to check up on the S3ATestUtils probes themselves.
* @see S3ATestUtils#authenticationContains(Configuration, String).
*/
@Test
public void testAuthenticationContainsProbes() {
Configuration conf = new Configuration(false);
assertFalse("found AssumedRoleCredentialProvider",
authenticationContains(conf, AssumedRoleCredentialProvider.NAME));
conf.set(AWS_CREDENTIALS_PROVIDER, AssumedRoleCredentialProvider.NAME);
assertTrue("didn't find AssumedRoleCredentialProvider",
authenticationContains(conf, AssumedRoleCredentialProvider.NAME));
}
}

View File

@ -0,0 +1,789 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.auth;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.file.AccessDeniedException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.services.securitytoken.model.AWSSecurityTokenServiceException;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3a.AWSBadRequestException;
import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
import org.apache.hadoop.fs.s3a.MultipartUtils;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3AUtils;
import org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider;
import org.apache.hadoop.fs.s3a.commit.CommitConstants;
import org.apache.hadoop.fs.s3a.commit.CommitOperations;
import org.apache.hadoop.fs.s3a.commit.files.PendingSet;
import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertRenameOutcome;
import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
import static org.apache.hadoop.fs.s3a.S3AUtils.*;
import static org.apache.hadoop.fs.s3a.auth.RoleTestUtils.*;
import static org.apache.hadoop.fs.s3a.auth.RoleModel.*;
import static org.apache.hadoop.fs.s3a.auth.RolePolicies.*;
import static org.apache.hadoop.fs.s3a.auth.RoleTestUtils.forbidden;
import static org.apache.hadoop.test.LambdaTestUtils.*;
/**
* Tests use of assumed roles.
* Only run if an assumed role is provided.
*/
@SuppressWarnings({"IOResourceOpenedButNotSafelyClosed", "ThrowableNotThrown"})
public class ITestAssumeRole extends AbstractS3ATestBase {
private static final Logger LOG =
LoggerFactory.getLogger(ITestAssumeRole.class);
private static final Path ROOT = new Path("/");
/**
* A role FS; if non-null it is closed in teardown.
*/
private S3AFileSystem roleFS;
@Override
public void setup() throws Exception {
super.setup();
assumeRoleTests();
}
@Override
public void teardown() throws Exception {
S3AUtils.closeAll(LOG, roleFS);
super.teardown();
}
private void assumeRoleTests() {
assume("No ARN for role tests", !getAssumedRoleARN().isEmpty());
}
private String getAssumedRoleARN() {
return getContract().getConf().getTrimmed(ASSUMED_ROLE_ARN, "");
}
/**
* Expect a filesystem to fail to instantiate.
* @param conf config to use
* @param clazz class of exception to expect
* @param text text in exception
* @param <E> type of exception as inferred from clazz
* @throws Exception if the exception was the wrong class
*/
private <E extends Throwable> void expectFileSystemCreateFailure(
Configuration conf,
Class<E> clazz,
String text) throws Exception {
interceptClosing(clazz,
text,
() -> new Path(getFileSystem().getUri()).getFileSystem(conf));
}
@Test
public void testCreateCredentialProvider() throws IOException {
describe("Create the credential provider");
String roleARN = getAssumedRoleARN();
Configuration conf = new Configuration(getContract().getConf());
conf.set(AWS_CREDENTIALS_PROVIDER, AssumedRoleCredentialProvider.NAME);
conf.set(ASSUMED_ROLE_ARN, roleARN);
conf.set(ASSUMED_ROLE_SESSION_NAME, "valid");
conf.set(ASSUMED_ROLE_SESSION_DURATION, "45m");
bindRolePolicy(conf, RESTRICTED_POLICY);
try (AssumedRoleCredentialProvider provider
= new AssumedRoleCredentialProvider(conf)) {
LOG.info("Provider is {}", provider);
AWSCredentials credentials = provider.getCredentials();
assertNotNull("Null credentials from " + provider, credentials);
}
}
@Test
public void testAssumedInvalidRole() throws Throwable {
Configuration conf = new Configuration();
conf.set(ASSUMED_ROLE_ARN, ROLE_ARN_EXAMPLE);
interceptClosing(AWSSecurityTokenServiceException.class,
E_BAD_ROLE,
() -> new AssumedRoleCredentialProvider(conf));
}
@Test
public void testAssumeRoleFSBadARN() throws Exception {
describe("Attemnpt to create the FS with an invalid ARN");
Configuration conf = createAssumedRoleConfig();
conf.set(ASSUMED_ROLE_ARN, ROLE_ARN_EXAMPLE);
expectFileSystemCreateFailure(conf, AccessDeniedException.class,
E_BAD_ROLE);
}
@Test
public void testAssumeRoleNoARN() throws Exception {
describe("Attemnpt to create the FS with no ARN");
Configuration conf = createAssumedRoleConfig();
conf.unset(ASSUMED_ROLE_ARN);
expectFileSystemCreateFailure(conf,
IOException.class,
AssumedRoleCredentialProvider.E_NO_ROLE);
}
@Test
public void testAssumeRoleFSBadPolicy() throws Exception {
describe("Attemnpt to create the FS with malformed JSON");
Configuration conf = createAssumedRoleConfig();
// add some malformed JSON
conf.set(ASSUMED_ROLE_POLICY, "}");
expectFileSystemCreateFailure(conf,
AWSBadRequestException.class,
"JSON");
}
@Test
public void testAssumeRoleFSBadPolicy2() throws Exception {
describe("Attempt to create the FS with valid but non-compliant JSON");
Configuration conf = createAssumedRoleConfig();
// add some invalid JSON
conf.set(ASSUMED_ROLE_POLICY, "{'json':'but not what AWS wants}");
expectFileSystemCreateFailure(conf,
AWSBadRequestException.class,
"Syntax errors in policy");
}
@Test
public void testAssumeRoleCannotAuthAssumedRole() throws Exception {
describe("Assert that you can't use assumed roles to auth assumed roles");
Configuration conf = createAssumedRoleConfig();
conf.set(ASSUMED_ROLE_CREDENTIALS_PROVIDER,
AssumedRoleCredentialProvider.NAME);
expectFileSystemCreateFailure(conf,
IOException.class,
AssumedRoleCredentialProvider.E_FORBIDDEN_PROVIDER);
}
@Test
public void testAssumeRoleBadInnerAuth() throws Exception {
describe("Try to authenticate with a keypair with spaces");
Configuration conf = createAssumedRoleConfig();
conf.set(ASSUMED_ROLE_CREDENTIALS_PROVIDER,
SimpleAWSCredentialsProvider.NAME);
conf.set(ACCESS_KEY, "not valid");
conf.set(SECRET_KEY, "not secret");
expectFileSystemCreateFailure(conf,
AWSBadRequestException.class,
"not a valid " +
"key=value pair (missing equal-sign) in Authorization header");
}
@Test
public void testAssumeRoleBadInnerAuth2() throws Exception {
describe("Try to authenticate with an invalid keypair");
Configuration conf = createAssumedRoleConfig();
conf.set(ASSUMED_ROLE_CREDENTIALS_PROVIDER,
SimpleAWSCredentialsProvider.NAME);
conf.set(ACCESS_KEY, "notvalid");
conf.set(SECRET_KEY, "notsecret");
expectFileSystemCreateFailure(conf,
AccessDeniedException.class,
"The security token included in the request is invalid");
}
@Test
public void testAssumeRoleBadSession() throws Exception {
describe("Try to authenticate with an invalid session");
Configuration conf = createAssumedRoleConfig();
conf.set(ASSUMED_ROLE_SESSION_NAME,
"Session names cannot hava spaces!");
expectFileSystemCreateFailure(conf,
AWSBadRequestException.class,
"Member must satisfy regular expression pattern");
}
/**
* Create the assumed role configuration.
* @return a config bonded to the ARN of the assumed role
*/
public Configuration createAssumedRoleConfig() {
return createAssumedRoleConfig(getAssumedRoleARN());
}
/**
* Create a config for an assumed role; it also disables FS caching.
* @param roleARN ARN of role
* @return the new configuration
*/
private Configuration createAssumedRoleConfig(String roleARN) {
return newAssumedRoleConfig(getContract().getConf(), roleARN);
}
@Test
public void testAssumeRoleUndefined() throws Throwable {
describe("Verify that you cannot instantiate the"
+ " AssumedRoleCredentialProvider without a role ARN");
Configuration conf = new Configuration();
conf.set(ASSUMED_ROLE_ARN, "");
interceptClosing(IOException.class,
AssumedRoleCredentialProvider.E_NO_ROLE,
() -> new AssumedRoleCredentialProvider(conf));
}
@Test
public void testAssumedIllegalDuration() throws Throwable {
describe("Expect the constructor to fail if the session is to short");
Configuration conf = new Configuration();
conf.set(ASSUMED_ROLE_SESSION_DURATION, "30s");
interceptClosing(IllegalArgumentException.class, "",
() -> new AssumedRoleCredentialProvider(conf));
}
@Test
public void testAssumeRoleCreateFS() throws IOException {
describe("Create an FS client with the role and do some basic IO");
String roleARN = getAssumedRoleARN();
Configuration conf = createAssumedRoleConfig(roleARN);
Path path = new Path(getFileSystem().getUri());
LOG.info("Creating test FS and user {} with assumed role {}",
conf.get(ACCESS_KEY), roleARN);
try (FileSystem fs = path.getFileSystem(conf)) {
fs.getFileStatus(new Path("/"));
fs.mkdirs(path("testAssumeRoleFS"));
}
}
@Test
public void testAssumeRoleRestrictedPolicyFS() throws Exception {
describe("Restrict the policy for this session; verify that reads fail");
Configuration conf = createAssumedRoleConfig();
bindRolePolicy(conf, RESTRICTED_POLICY);
Path path = new Path(getFileSystem().getUri());
try (FileSystem fs = path.getFileSystem(conf)) {
forbidden("getFileStatus",
() -> fs.getFileStatus(new Path("/")));
forbidden("getFileStatus",
() -> fs.listStatus(new Path("/")));
forbidden("getFileStatus",
() -> fs.mkdirs(path("testAssumeRoleFS")));
}
}
/**
* Tighten the extra policy on the assumed role call for torrent access,
* and verify that it blocks all other operations.
* That is: any non empty policy in the assumeRole API call overrides
* all of the policies attached to the role before.
* switches the role instance to only those policies in the
*/
@Test
public void testAssumeRolePoliciesOverrideRolePerms() throws Throwable {
describe("extra policies in assumed roles need;"
+ " all required policies stated");
Configuration conf = createAssumedRoleConfig();
bindRolePolicy(conf,
policy(statement(false, S3_ALL_BUCKETS, S3_GET_OBJECT_TORRENT)));
Path path = path("testAssumeRoleStillIncludesRolePerms");
roleFS = (S3AFileSystem) path.getFileSystem(conf);
assertTouchForbidden(roleFS, path);
}
/**
* After blocking all write verbs used by S3A, try to write data (fail)
* and read data (succeed).
*/
@Test
public void testReadOnlyOperations() throws Throwable {
describe("Restrict role to read only");
Configuration conf = createAssumedRoleConfig();
bindRolePolicy(conf,
policy(
statement(false, S3_ALL_BUCKETS, S3_PATH_WRITE_OPERATIONS),
STATEMENT_ALL_S3, STATEMENT_ALL_DDB));
Path path = methodPath();
roleFS = (S3AFileSystem) path.getFileSystem(conf);
// list the root path, expect happy
roleFS.listStatus(ROOT);
// touch will fail
assertTouchForbidden(roleFS, path);
// you can delete it, because it's not there and getFileStatus() is allowed
roleFS.delete(path, true);
//create it with the full FS
getFileSystem().mkdirs(path);
// and delete will not
assertDeleteForbidden(this.roleFS, path);
// list multipart uploads.
// This is part of the read policy.
int counter = 0;
MultipartUtils.UploadIterator iterator = roleFS.listUploads("/");
while (iterator.hasNext()) {
counter++;
iterator.next();
}
LOG.info("Found {} outstanding MPUs", counter);
}
/**
* Write successfully to the directory with full R/W access,
* fail to write or delete data elsewhere.
*/
@SuppressWarnings("StringConcatenationMissingWhitespace")
@Test
public void testRestrictedWriteSubdir() throws Throwable {
describe("Attempt writing to paths where a role only has"
+ " write access to a subdir of the bucket");
Path restrictedDir = methodPath();
Path child = new Path(restrictedDir, "child");
// the full FS
S3AFileSystem fs = getFileSystem();
fs.delete(restrictedDir, true);
Configuration conf = createAssumedRoleConfig();
bindRolePolicyStatements(conf,
STATEMENT_ALL_DDB,
statement(true, S3_ALL_BUCKETS, S3_ROOT_READ_OPERATIONS),
new Statement(Effects.Allow)
.addActions(S3_ALL_OPERATIONS)
.addResources(directory(restrictedDir)));
roleFS = (S3AFileSystem) restrictedDir.getFileSystem(conf);
roleFS.getFileStatus(ROOT);
roleFS.mkdirs(restrictedDir);
assertIsDirectory(restrictedDir);
// you can create an adjacent child
touch(roleFS, child);
assertIsFile(child);
// child delete rights
ContractTestUtils.assertDeleted(roleFS, child, true);
// parent delete rights
ContractTestUtils.assertDeleted(roleFS, restrictedDir, true);
// delete will try to create an empty parent directory marker, and may fail
roleFS.delete(restrictedDir, false);
// this sibling path has the same prefix as restrictedDir, but is
// adjacent. This verifies that a restrictedDir* pattern isn't matching
// siblings, so granting broader rights
Path sibling = new Path(restrictedDir.toUri() + "sibling");
touch(fs, sibling);
assertTouchForbidden(roleFS, sibling);
assertDeleteForbidden(roleFS, sibling);
}
public Path methodPath() throws IOException {
return path(getMethodName());
}
@Test
public void testRestrictedRename() throws Throwable {
describe("rename with parent paths not writeable");
executeRestrictedRename(createAssumedRoleConfig());
}
@Test
public void testRestrictedSingleDeleteRename() throws Throwable {
describe("rename with parent paths not writeable"
+ " and multi-object delete disabled");
Configuration conf = createAssumedRoleConfig();
conf.setBoolean(ENABLE_MULTI_DELETE, false);
executeRestrictedRename(conf);
}
/**
* Execute a sequence of rename operations.
* @param conf FS configuration
*/
public void executeRestrictedRename(final Configuration conf)
throws IOException {
Path basePath = methodPath();
Path restrictedDir = new Path(basePath, "renameSrc");
Path destPath = new Path(basePath, "renameDest");
Path child = new Path(restrictedDir, "child");
// the full FS
S3AFileSystem fs = getFileSystem();
fs.delete(basePath, true);
bindRolePolicyStatements(conf,
STATEMENT_ALL_DDB,
statement(true, S3_ALL_BUCKETS, S3_ROOT_READ_OPERATIONS),
new Statement(Effects.Allow)
.addActions(S3_PATH_RW_OPERATIONS)
.addResources(directory(restrictedDir))
.addResources(directory(destPath))
);
roleFS = (S3AFileSystem) restrictedDir.getFileSystem(conf);
roleFS.getFileStatus(ROOT);
roleFS.mkdirs(restrictedDir);
// you can create an adjacent child
touch(roleFS, child);
roleFS.delete(destPath, true);
// as dest doesn't exist, this will map child -> dest
assertRenameOutcome(roleFS, child, destPath, true);
assertIsFile(destPath);
assertIsDirectory(restrictedDir);
Path renamedDestPath = new Path(restrictedDir, destPath.getName());
assertRenameOutcome(roleFS, destPath, restrictedDir, true);
assertIsFile(renamedDestPath);
roleFS.delete(restrictedDir, true);
roleFS.delete(destPath, true);
}
@Test
public void testRestrictedRenameReadOnlyData() throws Throwable {
describe("rename with source read only, multidelete");
executeRenameReadOnlyData(createAssumedRoleConfig());
}
@Test
public void testRestrictedRenameReadOnlySingleDelete() throws Throwable {
describe("rename with source read only single delete");
Configuration conf = createAssumedRoleConfig();
conf.setBoolean(ENABLE_MULTI_DELETE, false);
executeRenameReadOnlyData(conf);
}
/**
* Execute a sequence of rename operations where the source
* data is read only to the client calling rename().
* This will cause the inner delete() operations to fail, whose outcomes
* are explored.
* Multiple files are created (in parallel) for some renames, so exploring
* the outcome on bulk delete calls, including verifying that a
* MultiObjectDeleteException is translated to an AccessDeniedException.
* <ol>
* <li>The exception raised is AccessDeniedException,
* from single and multi DELETE calls.</li>
* <li>It happens after the COPY. Not ideal, but, well, we can't pretend
* it's a filesystem forever.</li>
* </ol>
* @param conf FS configuration
*/
public void executeRenameReadOnlyData(final Configuration conf)
throws Exception {
assume("Does not work with S3Guard", !getFileSystem().hasMetadataStore());
Path basePath = methodPath();
Path destDir = new Path(basePath, "renameDest");
Path readOnlyDir = new Path(basePath, "readonlyDir");
Path readOnlyFile = new Path(readOnlyDir, "readonlyChild");
// the full FS
S3AFileSystem fs = getFileSystem();
fs.delete(basePath, true);
// this file is readable by the roleFS, but cannot be deleted
touch(fs, readOnlyFile);
bindRolePolicyStatements(conf,
STATEMENT_ALL_DDB,
statement(true, S3_ALL_BUCKETS, S3_ROOT_READ_OPERATIONS),
new Statement(Effects.Allow)
.addActions(S3_PATH_RW_OPERATIONS)
.addResources(directory(destDir))
);
roleFS = (S3AFileSystem) destDir.getFileSystem(conf);
roleFS.delete(destDir, true);
roleFS.mkdirs(destDir);
// rename will fail in the delete phase
forbidden(readOnlyFile.toString(),
() -> roleFS.rename(readOnlyFile, destDir));
// and the source file is still there
assertIsFile(readOnlyFile);
// but so is the copied version, because there's no attempt
// at rollback, or preflight checking on the delete permissions
Path renamedFile = new Path(destDir, readOnlyFile.getName());
assertIsFile(renamedFile);
ContractTestUtils.assertDeleted(roleFS, renamedFile, true);
assertFileCount("Empty Dest Dir", roleFS,
destDir, 0);
// create a set of files
// this is done in parallel as it is 10x faster on a long-haul test run.
int range = 10;
touchFiles(fs, readOnlyDir, range);
// don't forget about that original file!
final long createdFiles = range + 1;
// are they all there?
assertFileCount("files ready to rename", roleFS,
readOnlyDir, createdFiles);
// try to rename the directory
LOG.info("Renaming readonly files {} to {}", readOnlyDir, destDir);
AccessDeniedException ex = forbidden("",
() -> roleFS.rename(readOnlyDir, destDir));
LOG.info("Result of renaming read-only files is AccessDeniedException", ex);
assertFileCount("files copied to the destination", roleFS,
destDir, createdFiles);
assertFileCount("files in the source directory", roleFS,
readOnlyDir, createdFiles);
// and finally (so as to avoid the delay of POSTing some more objects,
// delete that r/o source
forbidden("", () -> roleFS.delete(readOnlyDir, true));
}
/**
* Parallel-touch a set of files in the destination directory.
* @param fs filesystem
* @param destDir destination
* @param range range 1..range inclusive of files to create.
*/
public void touchFiles(final S3AFileSystem fs,
final Path destDir,
final int range) {
IntStream.rangeClosed(1, range).parallel().forEach(
(i) -> eval(() -> touch(fs, new Path(destDir, "file-" + i))));
}
@Test
public void testRestrictedCommitActions() throws Throwable {
describe("Attempt commit operations against a path with restricted rights");
Configuration conf = createAssumedRoleConfig();
conf.setBoolean(CommitConstants.MAGIC_COMMITTER_ENABLED, true);
final int uploadPartSize = 5 * 1024 * 1024;
Path basePath = methodPath();
Path readOnlyDir = new Path(basePath, "readOnlyDir");
Path writeableDir = new Path(basePath, "writeableDir");
// the full FS
S3AFileSystem fs = getFileSystem();
fs.delete(basePath, true);
fs.mkdirs(readOnlyDir);
bindRolePolicyStatements(conf,
STATEMENT_ALL_DDB,
statement(true, S3_ALL_BUCKETS, S3_ROOT_READ_OPERATIONS),
new Statement(Effects.Allow)
.addActions(S3_PATH_RW_OPERATIONS)
.addResources(directory(writeableDir))
);
roleFS = (S3AFileSystem) writeableDir.getFileSystem(conf);
CommitOperations fullOperations = new CommitOperations(fs);
CommitOperations operations = new CommitOperations(roleFS);
File localSrc = File.createTempFile("source", "");
writeCSVData(localSrc);
Path uploadDest = new Path(readOnlyDir, "restricted.csv");
forbidden("initiate MultiPartUpload",
() -> {
return operations.uploadFileToPendingCommit(localSrc,
uploadDest, "", uploadPartSize);
});
// delete the file
localSrc.delete();
// create a directory there
localSrc.mkdirs();
// create some local files and upload them with permissions
int range = 2;
IntStream.rangeClosed(1, range)
.parallel()
.forEach((i) -> eval(() -> {
String name = "part-000" + i;
File src = new File(localSrc, name);
Path dest = new Path(readOnlyDir, name);
writeCSVData(src);
SinglePendingCommit pending =
fullOperations.uploadFileToPendingCommit(src, dest, "",
uploadPartSize);
pending.save(fs, new Path(readOnlyDir,
name + CommitConstants.PENDING_SUFFIX), true);
assertTrue(src.delete());
}));
try {
// we expect to be able to list all the files here
Pair<PendingSet, List<Pair<LocatedFileStatus, IOException>>>
pendingCommits = operations.loadSinglePendingCommits(readOnlyDir,
true);
// all those commits must fail
List<SinglePendingCommit> commits = pendingCommits.getLeft().getCommits();
assertEquals(range, commits.size());
commits.parallelStream().forEach(
(c) -> {
CommitOperations.MaybeIOE maybeIOE = operations.commit(c, "origin");
Path path = c.destinationPath();
assertCommitAccessDenied(path, maybeIOE);
});
// fail of all list and abort of .pending files.
LOG.info("abortAllSinglePendingCommits({})", readOnlyDir);
assertCommitAccessDenied(readOnlyDir,
operations.abortAllSinglePendingCommits(readOnlyDir, true));
// try writing a magic file
Path magicDestPath = new Path(readOnlyDir,
CommitConstants.MAGIC + "/" + "magic.txt");
forbidden("", () -> {
touch(roleFS, magicDestPath);
// shouldn't get here; if we do: return the existence of the 0-byte
// dest file.
return fs.getFileStatus(magicDestPath);
});
// a recursive list and abort is blocked.
forbidden("",
() -> operations.abortPendingUploadsUnderPath(readOnlyDir));
} finally {
LOG.info("Cleanup");
fullOperations.abortPendingUploadsUnderPath(readOnlyDir);
}
}
/**
* Verifies that an operation returning a "MaybeIOE" failed
* with an AccessDeniedException in the maybe instance.
* @param path path operated on
* @param maybeIOE result to inspect
*/
public void assertCommitAccessDenied(final Path path,
final CommitOperations.MaybeIOE maybeIOE) {
IOException ex = maybeIOE.getException();
assertNotNull("no IOE in " + maybeIOE + " for " + path, ex);
if (!(ex instanceof AccessDeniedException)) {
ContractTestUtils.fail("Wrong exception class for commit to "
+ path, ex);
}
}
/**
* Write some CSV data to a local file.
* @param localSrc local file
* @throws IOException failure
*/
public void writeCSVData(final File localSrc) throws IOException {
try(FileOutputStream fo = new FileOutputStream(localSrc)) {
fo.write("1, true".getBytes());
}
}
@Test
public void testPartialDelete() throws Throwable {
describe("delete with part of the child tree read only; multidelete");
executePartialDelete(createAssumedRoleConfig());
}
@Test
public void testPartialDeleteSingleDelete() throws Throwable {
describe("delete with part of the child tree read only");
Configuration conf = createAssumedRoleConfig();
conf.setBoolean(ENABLE_MULTI_DELETE, false);
executePartialDelete(conf);
}
/**
* Have a directory with full R/W permissions, but then remove
* write access underneath, and try to delete it.
* @param conf FS configuration
*/
public void executePartialDelete(final Configuration conf)
throws Exception {
Path destDir = methodPath();
Path readOnlyDir = new Path(destDir, "readonlyDir");
// the full FS
S3AFileSystem fs = getFileSystem();
fs.delete(destDir, true);
bindRolePolicyStatements(conf,
STATEMENT_ALL_DDB,
statement(true, S3_ALL_BUCKETS, S3_ALL_OPERATIONS),
new Statement(Effects.Deny)
.addActions(S3_PATH_WRITE_OPERATIONS)
.addResources(directory(readOnlyDir))
);
roleFS = (S3AFileSystem) destDir.getFileSystem(conf);
int range = 10;
touchFiles(fs, readOnlyDir, range);
touchFiles(roleFS, destDir, range);
forbidden("", () -> roleFS.delete(readOnlyDir, true));
forbidden("", () -> roleFS.delete(destDir, true));
// and although you can't delete under the path, if the file doesn't
// exist, the delete call fails fast.
Path pathWhichDoesntExist = new Path(readOnlyDir, "no-such-path");
assertFalse("deleting " + pathWhichDoesntExist,
roleFS.delete(pathWhichDoesntExist, true));
}
/**
* Assert that the number of files in a destination matches that expected.
* @param text text to use in the message
* @param fs filesystem
* @param path path to list (recursively)
* @param expected expected count
* @throws IOException IO problem
*/
private static void assertFileCount(String text, FileSystem fs,
Path path, long expected)
throws IOException {
List<String> files = new ArrayList<>();
applyLocatedFiles(fs.listFiles(path, true),
(status) -> files.add(status.getPath().toString()));
long actual = files.size();
if (actual != expected) {
String ls = files.stream().collect(Collectors.joining("\n"));
fail(text + ": expected " + expected + " files in " + path
+ " but got " + actual + "\n" + ls);
}
}
}

View File

@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.auth;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3AUtils;
import org.apache.hadoop.fs.s3a.commit.ITestCommitOperations;
import static org.apache.hadoop.fs.s3a.Constants.ASSUMED_ROLE_ARN;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume;
import static org.apache.hadoop.fs.s3a.auth.RoleModel.*;
import static org.apache.hadoop.fs.s3a.auth.RolePolicies.*;
import static org.apache.hadoop.fs.s3a.auth.RoleTestUtils.*;
/**
* Verify that the commit operations work with a restricted set of operations.
* The superclass, {@link ITestCommitOperations} turns on an inconsistent client
* to see how things work in the presence of inconsistency.
* These tests disable it, to remove that as a factor in these tests, which are
* verifying that the policy settings to enabled MPU list/commit/abort are all
* enabled properly.
*/
public class ITestAssumedRoleCommitOperations extends ITestCommitOperations {
private static final Logger LOG =
LoggerFactory.getLogger(ITestAssumedRoleCommitOperations.class);
/**
* The restricted directory.
*/
private Path restrictedDir;
/**
* A role FS; if non-null it is closed in teardown.
*/
private S3AFileSystem roleFS;
@Override
public boolean useInconsistentClient() {
return false;
}
@Override
public void setup() throws Exception {
super.setup();
assumeRoleTests();
restrictedDir = super.path("restricted");
Configuration conf = newAssumedRoleConfig(getConfiguration(),
getAssumedRoleARN());
bindRolePolicyStatements(conf,
STATEMENT_ALL_DDB,
statement(true, S3_ALL_BUCKETS, S3_ROOT_READ_OPERATIONS),
new RoleModel.Statement(RoleModel.Effects.Allow)
.addActions(S3_PATH_RW_OPERATIONS)
.addResources(directory(restrictedDir))
);
roleFS = (S3AFileSystem) restrictedDir.getFileSystem(conf);
}
@Override
public void teardown() throws Exception {
S3AUtils.closeAll(LOG, roleFS);
// switches getFileSystem() back to the full FS.
roleFS = null;
super.teardown();
}
private void assumeRoleTests() {
assume("No ARN for role tests", !getAssumedRoleARN().isEmpty());
}
/**
* The overridden operation returns the roleFS, so that test cases
* in the superclass run under restricted rights.
* There's special handling in startup to avoid NPEs
* @return {@link #roleFS}
*/
@Override
public S3AFileSystem getFileSystem() {
return roleFS != null ? roleFS : getFullFileSystem();
}
/**
* Get the FS with full access rights.
* @return the FS created by the superclass.
*/
public S3AFileSystem getFullFileSystem() {
return super.getFileSystem();
}
/**
* switch to an inconsistent path if in inconsistent mode.
* {@inheritDoc}
*/
@Override
protected Path path(String filepath) throws IOException {
return new Path(restrictedDir, filepath);
}
private String getAssumedRoleARN() {
return getContract().getConf().getTrimmed(ASSUMED_ROLE_ARN, "");
}
}

View File

@ -0,0 +1,171 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.auth;
import java.nio.file.AccessDeniedException;
import java.util.concurrent.Callable;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.test.GenericTestUtils;
import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
import static org.apache.hadoop.fs.s3a.auth.RoleModel.*;
import static org.apache.hadoop.fs.s3a.auth.RolePolicies.*;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
/**
* Helper class for testing roles.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public final class RoleTestUtils {
private static final Logger LOG =
LoggerFactory.getLogger(RoleTestUtils.class);
private static final RoleModel MODEL = new RoleModel();
/** Example ARN of a role. */
public static final String ROLE_ARN_EXAMPLE
= "arn:aws:iam::9878543210123:role/role-s3-restricted";
/** Deny GET requests to all buckets. */
public static final Statement DENY_GET_ALL =
statement(false, S3_ALL_BUCKETS, S3_GET_OBJECT);
/**
* This is AWS policy removes read access.
*/
public static final Policy RESTRICTED_POLICY = policy(DENY_GET_ALL);
/**
* Error message to get from the AWS SDK if you can't assume the role.
*/
public static final String E_BAD_ROLE
= "Not authorized to perform sts:AssumeRole";
private RoleTestUtils() {
}
/**
* Bind the configuration's {@code ASSUMED_ROLE_POLICY} option to
* the given policy.
* @param conf configuration to patch
* @param policy policy to apply
* @return the modified configuration
* @throws JsonProcessingException JSON marshalling error
*/
public static Configuration bindRolePolicy(final Configuration conf,
final Policy policy) throws JsonProcessingException {
String p = MODEL.toJson(policy);
LOG.info("Setting role policy to policy of size {}:\n{}", p.length(), p);
conf.set(ASSUMED_ROLE_POLICY, p);
return conf;
}
/**
* Wrap a set of statements with a policy and bind the configuration's
* {@code ASSUMED_ROLE_POLICY} option to it.
* @param conf configuration to patch
* @param statements statements to aggregate
* @return the modified configuration
* @throws JsonProcessingException JSON marshalling error
*/
public static Configuration bindRolePolicyStatements(
final Configuration conf,
final Statement... statements) throws JsonProcessingException {
return bindRolePolicy(conf, policy(statements));
}
/**
* Try to delete a file, verify that it is not allowed.
* @param fs filesystem
* @param path path
*/
public static void assertDeleteForbidden(final FileSystem fs, final Path path)
throws Exception {
intercept(AccessDeniedException.class, "",
() -> fs.delete(path, true));
}
/**
* Try to touch a file, verify that it is not allowed.
* @param fs filesystem
* @param path path
*/
public static void assertTouchForbidden(final FileSystem fs, final Path path)
throws Exception {
intercept(AccessDeniedException.class, "",
"Caller could create file at " + path,
() -> {
touch(fs, path);
return fs.getFileStatus(path);
});
}
/**
* Create a config for an assumed role; it also disables FS caching.
* @param srcConf source config: this is not modified
* @param roleARN ARN of role
* @return the new configuration
*/
public static Configuration newAssumedRoleConfig(
final Configuration srcConf,
final String roleARN) {
Configuration conf = new Configuration(srcConf);
conf.set(AWS_CREDENTIALS_PROVIDER, AssumedRoleCredentialProvider.NAME);
conf.set(ASSUMED_ROLE_ARN, roleARN);
conf.set(ASSUMED_ROLE_SESSION_NAME, "valid");
conf.set(ASSUMED_ROLE_SESSION_DURATION, "15m");
disableFilesystemCaching(conf);
return conf;
}
/**
* Assert that an operation is forbidden.
* @param contained contained text, may be null
* @param eval closure to evaluate
* @param <T> type of closure
* @return the access denied exception
* @throws Exception any other exception
*/
public static <T> AccessDeniedException forbidden(
String contained,
Callable<T> eval)
throws Exception {
AccessDeniedException ex = intercept(AccessDeniedException.class, eval);
GenericTestUtils.assertExceptionContains(contained, ex);
return ex;
}
}

View File

@ -208,7 +208,9 @@ public abstract class AbstractCommitITest extends AbstractS3ATestBase {
* @param p probability of a throttling occurring: 0-1.0
*/
protected void setThrottling(float p) {
inconsistentClient.setThrottleProbability(p);
if (inconsistentClient != null) {
inconsistentClient.setThrottleProbability(p);
}
}
/**
@ -217,7 +219,9 @@ public abstract class AbstractCommitITest extends AbstractS3ATestBase {
* @param limit limit to number of calls which fail
*/
protected void setThrottling(float p, int limit) {
inconsistentClient.setThrottleProbability(p);
if (inconsistentClient != null) {
inconsistentClient.setThrottleProbability(p);
}
setFailureLimit(limit);
}
@ -235,7 +239,9 @@ public abstract class AbstractCommitITest extends AbstractS3ATestBase {
* @param limit limit to number of calls which fail
*/
private void setFailureLimit(int limit) {
inconsistentClient.setFailureLimit(limit);
if (inconsistentClient != null) {
inconsistentClient.setFailureLimit(limit);
}
}
/**

View File

@ -528,7 +528,9 @@ public class ITestCommitOperations extends AbstractCommitITest {
@Test
public void testWriteNormalStream() throws Throwable {
S3AFileSystem fs = getFileSystem();
Assume.assumeTrue(fs.hasCapability(STREAM_CAPABILITY_MAGIC_OUTPUT));
Assume.assumeTrue(
"Filesystem does not have magic support enabled: " + fs,
fs.hasCapability(STORE_CAPABILITY_MAGIC_COMMITTER));
Path destFile = path("normal");
try (FSDataOutputStream out = fs.create(destFile, true)) {