HADOOP-17822. fs.s3a.acl.default not working after S3A Audit feature (#3249)
Fixes the regression caused by HADOOP-17511 by moving where the option fs.s3a.acl.default is read -doing it before the RequestFactory is created. Adds * A unit test in TestRequestFactory to verify the ACLs are set on all file write operations. * A new ITestS3ACannedACLs test which verifies that ACLs really do get all the way through. * S3A Assumed Role delegation tokens to include the IAM permission s3:PutObjectAcl in the generated role. Contributed by Steve Loughran
This commit is contained in:
parent
ee466d4b40
commit
4627e9c7ef
|
@ -1550,7 +1550,9 @@
|
||||||
<name>fs.s3a.acl.default</name>
|
<name>fs.s3a.acl.default</name>
|
||||||
<description>Set a canned ACL for newly created and copied objects. Value may be Private,
|
<description>Set a canned ACL for newly created and copied objects. Value may be Private,
|
||||||
PublicRead, PublicReadWrite, AuthenticatedRead, LogDeliveryWrite, BucketOwnerRead,
|
PublicRead, PublicReadWrite, AuthenticatedRead, LogDeliveryWrite, BucketOwnerRead,
|
||||||
or BucketOwnerFullControl.</description>
|
or BucketOwnerFullControl.
|
||||||
|
If set, caller IAM role must have "s3:PutObjectAcl" permission on the bucket.
|
||||||
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
|
|
|
@ -492,7 +492,6 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
||||||
|
|
||||||
initTransferManager();
|
initTransferManager();
|
||||||
|
|
||||||
initCannedAcls(conf);
|
|
||||||
|
|
||||||
// This initiates a probe against S3 for the bucket existing.
|
// This initiates a probe against S3 for the bucket existing.
|
||||||
doBucketProbing();
|
doBucketProbing();
|
||||||
|
@ -922,6 +921,10 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
||||||
UPLOAD_PART_COUNT_LIMIT);
|
UPLOAD_PART_COUNT_LIMIT);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ACLs; this is passed to the
|
||||||
|
// request factory.
|
||||||
|
initCannedAcls(getConf());
|
||||||
|
|
||||||
return RequestFactoryImpl.builder()
|
return RequestFactoryImpl.builder()
|
||||||
.withBucket(requireNonNull(bucket))
|
.withBucket(requireNonNull(bucket))
|
||||||
.withCannedACL(getCannedACL())
|
.withCannedACL(getCannedACL())
|
||||||
|
|
|
@ -248,6 +248,7 @@ public final class RolePolicies {
|
||||||
Collections.unmodifiableList(Arrays.asList(new String[]{
|
Collections.unmodifiableList(Arrays.asList(new String[]{
|
||||||
S3_ALL_GET,
|
S3_ALL_GET,
|
||||||
S3_PUT_OBJECT,
|
S3_PUT_OBJECT,
|
||||||
|
S3_PUT_OBJECT_ACL,
|
||||||
S3_DELETE_OBJECT,
|
S3_DELETE_OBJECT,
|
||||||
S3_ABORT_MULTIPART_UPLOAD,
|
S3_ABORT_MULTIPART_UPLOAD,
|
||||||
}));
|
}));
|
||||||
|
@ -262,6 +263,7 @@ public final class RolePolicies {
|
||||||
public static final List<String> S3_PATH_WRITE_OPERATIONS =
|
public static final List<String> S3_PATH_WRITE_OPERATIONS =
|
||||||
Collections.unmodifiableList(Arrays.asList(new String[]{
|
Collections.unmodifiableList(Arrays.asList(new String[]{
|
||||||
S3_PUT_OBJECT,
|
S3_PUT_OBJECT,
|
||||||
|
S3_PUT_OBJECT_ACL,
|
||||||
S3_DELETE_OBJECT,
|
S3_DELETE_OBJECT,
|
||||||
S3_ABORT_MULTIPART_UPLOAD
|
S3_ABORT_MULTIPART_UPLOAD
|
||||||
}));
|
}));
|
||||||
|
@ -274,6 +276,7 @@ public final class RolePolicies {
|
||||||
Collections.unmodifiableList(Arrays.asList(new String[]{
|
Collections.unmodifiableList(Arrays.asList(new String[]{
|
||||||
S3_ALL_GET,
|
S3_ALL_GET,
|
||||||
S3_PUT_OBJECT,
|
S3_PUT_OBJECT,
|
||||||
|
S3_PUT_OBJECT_ACL,
|
||||||
S3_DELETE_OBJECT,
|
S3_DELETE_OBJECT,
|
||||||
S3_ABORT_MULTIPART_UPLOAD,
|
S3_ABORT_MULTIPART_UPLOAD,
|
||||||
}));
|
}));
|
||||||
|
|
|
@ -944,7 +944,9 @@ options are covered in [Testing](./testing.md).
|
||||||
<name>fs.s3a.acl.default</name>
|
<name>fs.s3a.acl.default</name>
|
||||||
<description>Set a canned ACL for newly created and copied objects. Value may be Private,
|
<description>Set a canned ACL for newly created and copied objects. Value may be Private,
|
||||||
PublicRead, PublicReadWrite, AuthenticatedRead, LogDeliveryWrite, BucketOwnerRead,
|
PublicRead, PublicReadWrite, AuthenticatedRead, LogDeliveryWrite, BucketOwnerRead,
|
||||||
or BucketOwnerFullControl.</description>
|
or BucketOwnerFullControl.
|
||||||
|
If set, caller IAM role must have "s3:PutObjectAcl" permission on the bucket.
|
||||||
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
|
|
|
@ -0,0 +1,105 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.fs.s3a;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import com.amazonaws.services.s3.AmazonS3;
|
||||||
|
import com.amazonaws.services.s3.model.AccessControlList;
|
||||||
|
import com.amazonaws.services.s3.model.Grant;
|
||||||
|
import com.amazonaws.services.s3.model.GroupGrantee;
|
||||||
|
import com.amazonaws.services.s3.model.Permission;
|
||||||
|
import org.assertj.core.api.Assertions;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
||||||
|
import org.apache.hadoop.fs.s3a.audit.S3AAuditConstants;
|
||||||
|
import org.apache.hadoop.fs.s3a.impl.StoreContext;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.s3a.Constants.CANNED_ACL;
|
||||||
|
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests of ACL handling in the FS.
|
||||||
|
* If you enable logging, the grantee list adds
|
||||||
|
* Grant [grantee=GroupGrantee [http://acs.amazonaws.com/groups/s3/LogDelivery], permission=WRITE]
|
||||||
|
*/
|
||||||
|
public class ITestS3ACannedACLs extends AbstractS3ATestBase {
|
||||||
|
|
||||||
|
private static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(ITestS3ACannedACLs.class);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Configuration createConfiguration() {
|
||||||
|
Configuration conf = super.createConfiguration();
|
||||||
|
removeBaseAndBucketOverrides(conf,
|
||||||
|
CANNED_ACL);
|
||||||
|
|
||||||
|
conf.set(CANNED_ACL, LOG_DELIVERY_WRITE);
|
||||||
|
// needed because of direct calls made
|
||||||
|
conf.setBoolean(S3AAuditConstants.REJECT_OUT_OF_SPAN_OPERATIONS, false);
|
||||||
|
return conf;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCreatedObjectsHaveACLs() throws Throwable {
|
||||||
|
S3AFileSystem fs = getFileSystem();
|
||||||
|
Path dir = methodPath();
|
||||||
|
fs.mkdirs(dir);
|
||||||
|
assertObjectHasLoggingGrant(dir, false);
|
||||||
|
Path path = new Path(dir, "1");
|
||||||
|
ContractTestUtils.touch(fs, path);
|
||||||
|
assertObjectHasLoggingGrant(path, true);
|
||||||
|
Path path2 = new Path(dir, "2");
|
||||||
|
fs.rename(path, path2);
|
||||||
|
assertObjectHasLoggingGrant(path2, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Assert that a given object granted the AWS logging service
|
||||||
|
* write access.
|
||||||
|
* Logs all the grants.
|
||||||
|
* @param path path
|
||||||
|
* @param isFile is this a file or a directory?
|
||||||
|
*/
|
||||||
|
private void assertObjectHasLoggingGrant(Path path, boolean isFile) {
|
||||||
|
S3AFileSystem fs = getFileSystem();
|
||||||
|
|
||||||
|
StoreContext storeContext = fs.createStoreContext();
|
||||||
|
AmazonS3 s3 = fs.getAmazonS3ClientForTesting("acls");
|
||||||
|
String key = storeContext.pathToKey(path);
|
||||||
|
if (!isFile) {
|
||||||
|
key = key + "/";
|
||||||
|
}
|
||||||
|
AccessControlList acl = s3.getObjectAcl(storeContext.getBucket(),
|
||||||
|
key);
|
||||||
|
List<Grant> grants = acl.getGrantsAsList();
|
||||||
|
for (Grant grant : grants) {
|
||||||
|
LOG.info("{}", grant.toString());
|
||||||
|
}
|
||||||
|
Grant loggingGrant = new Grant(GroupGrantee.LogDelivery, Permission.Write);
|
||||||
|
Assertions.assertThat(grants)
|
||||||
|
.describedAs("ACL grants of object %s", path)
|
||||||
|
.contains(loggingGrant);
|
||||||
|
}
|
||||||
|
}
|
|
@ -178,6 +178,11 @@ public interface S3ATestConstants {
|
||||||
"fs.s3a.s3guard.test.dynamo.table.prefix";
|
"fs.s3a.s3guard.test.dynamo.table.prefix";
|
||||||
String TEST_S3GUARD_DYNAMO_TABLE_PREFIX_DEFAULT = "s3guard.test.";
|
String TEST_S3GUARD_DYNAMO_TABLE_PREFIX_DEFAULT = "s3guard.test.";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* ACL for S3 Logging; used in some tests: {@value}.
|
||||||
|
*/
|
||||||
|
String LOG_DELIVERY_WRITE = "LogDeliveryWrite";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Timeout in Milliseconds for standard tests: {@value}.
|
* Timeout in Milliseconds for standard tests: {@value}.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -161,6 +161,9 @@ public class ITestSessionDelegationInFileystem extends AbstractDelegationIT {
|
||||||
}
|
}
|
||||||
// set the YARN RM up for YARN tests.
|
// set the YARN RM up for YARN tests.
|
||||||
conf.set(YarnConfiguration.RM_PRINCIPAL, YARN_RM);
|
conf.set(YarnConfiguration.RM_PRINCIPAL, YARN_RM);
|
||||||
|
// turn on ACLs so as to verify role DT permissions include
|
||||||
|
// write access.
|
||||||
|
conf.set(CANNED_ACL, LOG_DELIVERY_WRITE);
|
||||||
return conf;
|
return conf;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -24,8 +24,10 @@ import java.util.ArrayList;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
import com.amazonaws.AmazonWebServiceRequest;
|
import com.amazonaws.AmazonWebServiceRequest;
|
||||||
|
import com.amazonaws.services.s3.model.CannedAccessControlList;
|
||||||
import com.amazonaws.services.s3.model.ObjectListing;
|
import com.amazonaws.services.s3.model.ObjectListing;
|
||||||
import com.amazonaws.services.s3.model.ObjectMetadata;
|
import com.amazonaws.services.s3.model.ObjectMetadata;
|
||||||
|
import org.assertj.core.api.Assertions;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -73,6 +75,37 @@ public class TestRequestFactory extends AbstractHadoopTestBase {
|
||||||
createFactoryObjects(factory);
|
createFactoryObjects(factory);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verify ACLs are passed from the factory to the requests.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testRequestFactoryWithCannedACL() throws Throwable {
|
||||||
|
CannedAccessControlList acl = CannedAccessControlList.BucketOwnerFullControl;
|
||||||
|
RequestFactory factory = RequestFactoryImpl.builder()
|
||||||
|
.withBucket("bucket")
|
||||||
|
.withCannedACL(acl)
|
||||||
|
.build();
|
||||||
|
String path = "path";
|
||||||
|
String path2 = "path2";
|
||||||
|
ObjectMetadata md = factory.newObjectMetadata(128);
|
||||||
|
Assertions.assertThat(
|
||||||
|
factory.newPutObjectRequest(path, md,
|
||||||
|
new ByteArrayInputStream(new byte[0]))
|
||||||
|
.getCannedAcl())
|
||||||
|
.describedAs("ACL of PUT")
|
||||||
|
.isEqualTo(acl);
|
||||||
|
Assertions.assertThat(factory.newCopyObjectRequest(path, path2, md)
|
||||||
|
.getCannedAccessControlList())
|
||||||
|
.describedAs("ACL of COPY")
|
||||||
|
.isEqualTo(acl);
|
||||||
|
Assertions.assertThat(factory.newMultipartUploadRequest(path)
|
||||||
|
.getCannedACL())
|
||||||
|
.describedAs("ACL of MPU")
|
||||||
|
.isEqualTo(acl);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Now add a processor and verify that it was invoked for
|
* Now add a processor and verify that it was invoked for
|
||||||
* exactly as many requests as were analyzed.
|
* exactly as many requests as were analyzed.
|
||||||
|
|
Loading…
Reference in New Issue