HADOOP-17822. fs.s3a.acl.default not working after S3A Audit feature (#3249)

Fixes the regression caused by HADOOP-17511 by moving where the
option  fs.s3a.acl.default is read -doing it before the RequestFactory
is created.

Adds

* A unit test in TestRequestFactory to verify the ACLs are set
  on all file write operations.
* A new ITestS3ACannedACLs test which verifies that ACLs really
  do get all the way through.
* S3A Assumed Role delegation tokens to include the IAM permission
  s3:PutObjectAcl in the generated role.

Contributed by Steve Loughran

Change-Id: I3abac6a1b9e150b6b6df0af7c2c70093f8f518cb
This commit is contained in:
Steve Loughran 2021-08-02 15:26:56 +01:00
parent 26514b6534
commit c1ad91e72d
No known key found for this signature in database
GPG Key ID: D22CF846DBB162A0
8 changed files with 159 additions and 3 deletions

View File

@ -1541,7 +1541,9 @@
<name>fs.s3a.acl.default</name>
<description>Set a canned ACL for newly created and copied objects. Value may be Private,
PublicRead, PublicReadWrite, AuthenticatedRead, LogDeliveryWrite, BucketOwnerRead,
or BucketOwnerFullControl.</description>
or BucketOwnerFullControl.
If set, caller IAM role must have "s3:PutObjectAcl" permission on the bucket.
</description>
</property>
<property>

View File

@ -481,7 +481,6 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
initTransferManager();
initCannedAcls(conf);
// This initiates a probe against S3 for the bucket existing.
doBucketProbing();
@ -888,6 +887,10 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
UPLOAD_PART_COUNT_LIMIT);
}
// ACLs; this is passed to the
// request factory.
initCannedAcls(getConf());
return RequestFactoryImpl.builder()
.withBucket(requireNonNull(bucket))
.withCannedACL(getCannedACL())

View File

@ -248,6 +248,7 @@ public final class RolePolicies {
Collections.unmodifiableList(Arrays.asList(new String[]{
S3_ALL_GET,
S3_PUT_OBJECT,
S3_PUT_OBJECT_ACL,
S3_DELETE_OBJECT,
S3_ABORT_MULTIPART_UPLOAD,
}));
@ -262,6 +263,7 @@ public final class RolePolicies {
public static final List<String> S3_PATH_WRITE_OPERATIONS =
Collections.unmodifiableList(Arrays.asList(new String[]{
S3_PUT_OBJECT,
S3_PUT_OBJECT_ACL,
S3_DELETE_OBJECT,
S3_ABORT_MULTIPART_UPLOAD
}));
@ -274,6 +276,7 @@ public final class RolePolicies {
Collections.unmodifiableList(Arrays.asList(new String[]{
S3_ALL_GET,
S3_PUT_OBJECT,
S3_PUT_OBJECT_ACL,
S3_DELETE_OBJECT,
S3_ABORT_MULTIPART_UPLOAD,
}));

View File

@ -944,7 +944,9 @@ options are covered in [Testing](./testing.md).
<name>fs.s3a.acl.default</name>
<description>Set a canned ACL for newly created and copied objects. Value may be Private,
PublicRead, PublicReadWrite, AuthenticatedRead, LogDeliveryWrite, BucketOwnerRead,
or BucketOwnerFullControl.</description>
or BucketOwnerFullControl.
If set, caller IAM role must have "s3:PutObjectAcl" permission on the bucket.
</description>
</property>
<property>

View File

@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import java.util.List;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.AccessControlList;
import com.amazonaws.services.s3.model.Grant;
import com.amazonaws.services.s3.model.GroupGrantee;
import com.amazonaws.services.s3.model.Permission;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3a.audit.S3AAuditConstants;
import org.apache.hadoop.fs.s3a.impl.StoreContext;
import static org.apache.hadoop.fs.s3a.Constants.CANNED_ACL;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
/**
* Tests of ACL handling in the FS.
* If you enable logging, the grantee list adds
* Grant [grantee=GroupGrantee [http://acs.amazonaws.com/groups/s3/LogDelivery], permission=WRITE]
*/
public class ITestS3ACannedACLs extends AbstractS3ATestBase {
private static final Logger LOG =
LoggerFactory.getLogger(ITestS3ACannedACLs.class);
@Override
protected Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
removeBaseAndBucketOverrides(conf,
CANNED_ACL);
conf.set(CANNED_ACL, LOG_DELIVERY_WRITE);
// needed because of direct calls made
conf.setBoolean(S3AAuditConstants.REJECT_OUT_OF_SPAN_OPERATIONS, false);
return conf;
}
@Test
public void testCreatedObjectsHaveACLs() throws Throwable {
S3AFileSystem fs = getFileSystem();
Path dir = methodPath();
fs.mkdirs(dir);
assertObjectHasLoggingGrant(dir, false);
Path path = new Path(dir, "1");
ContractTestUtils.touch(fs, path);
assertObjectHasLoggingGrant(path, true);
Path path2 = new Path(dir, "2");
fs.rename(path, path2);
assertObjectHasLoggingGrant(path2, true);
}
/**
* Assert that a given object granted the AWS logging service
* write access.
* Logs all the grants.
* @param path path
* @param isFile is this a file or a directory?
*/
private void assertObjectHasLoggingGrant(Path path, boolean isFile) {
S3AFileSystem fs = getFileSystem();
StoreContext storeContext = fs.createStoreContext();
AmazonS3 s3 = fs.getAmazonS3ClientForTesting("acls");
String key = storeContext.pathToKey(path);
if (!isFile) {
key = key + "/";
}
AccessControlList acl = s3.getObjectAcl(storeContext.getBucket(),
key);
List<Grant> grants = acl.getGrantsAsList();
for (Grant grant : grants) {
LOG.info("{}", grant.toString());
}
Grant loggingGrant = new Grant(GroupGrantee.LogDelivery, Permission.Write);
Assertions.assertThat(grants)
.describedAs("ACL grants of object %s", path)
.contains(loggingGrant);
}
}

View File

@ -178,6 +178,11 @@ public interface S3ATestConstants {
"fs.s3a.s3guard.test.dynamo.table.prefix";
String TEST_S3GUARD_DYNAMO_TABLE_PREFIX_DEFAULT = "s3guard.test.";
/**
* ACL for S3 Logging; used in some tests: {@value}.
*/
String LOG_DELIVERY_WRITE = "LogDeliveryWrite";
/**
* Timeout in Milliseconds for standard tests: {@value}.
*/

View File

@ -156,6 +156,9 @@ public class ITestSessionDelegationInFileystem extends AbstractDelegationIT {
}
// set the YARN RM up for YARN tests.
conf.set(YarnConfiguration.RM_PRINCIPAL, YARN_RM);
// turn on ACLs so as to verify role DT permissions include
// write access.
conf.set(CANNED_ACL, LOG_DELIVERY_WRITE);
return conf;
}

View File

@ -24,8 +24,10 @@ import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicLong;
import com.amazonaws.AmazonWebServiceRequest;
import com.amazonaws.services.s3.model.CannedAccessControlList;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.ObjectMetadata;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -73,6 +75,37 @@ public class TestRequestFactory extends AbstractHadoopTestBase {
createFactoryObjects(factory);
}
/**
* Verify ACLs are passed from the factory to the requests.
*/
@Test
public void testRequestFactoryWithCannedACL() throws Throwable {
CannedAccessControlList acl = CannedAccessControlList.BucketOwnerFullControl;
RequestFactory factory = RequestFactoryImpl.builder()
.withBucket("bucket")
.withCannedACL(acl)
.build();
String path = "path";
String path2 = "path2";
ObjectMetadata md = factory.newObjectMetadata(128);
Assertions.assertThat(
factory.newPutObjectRequest(path, md,
new ByteArrayInputStream(new byte[0]))
.getCannedAcl())
.describedAs("ACL of PUT")
.isEqualTo(acl);
Assertions.assertThat(factory.newCopyObjectRequest(path, path2, md)
.getCannedAccessControlList())
.describedAs("ACL of COPY")
.isEqualTo(acl);
Assertions.assertThat(factory.newMultipartUploadRequest(path)
.getCannedACL())
.describedAs("ACL of MPU")
.isEqualTo(acl);
}
/**
* Now add a processor and verify that it was invoked for
* exactly as many requests as were analyzed.