mirror of https://github.com/apache/nifi.git
NIFI-2507 Added Canned ACL support to PutS3Object
This closes #801 Signed-off-by: James Wing <jvwing@gmail.com>
This commit is contained in:
parent
b2401522ea
commit
45c31c8305
|
@ -34,6 +34,7 @@ import com.amazonaws.regions.Region;
|
||||||
import com.amazonaws.services.s3.AmazonS3Client;
|
import com.amazonaws.services.s3.AmazonS3Client;
|
||||||
import com.amazonaws.services.s3.S3ClientOptions;
|
import com.amazonaws.services.s3.S3ClientOptions;
|
||||||
import com.amazonaws.services.s3.model.AccessControlList;
|
import com.amazonaws.services.s3.model.AccessControlList;
|
||||||
|
import com.amazonaws.services.s3.model.CannedAccessControlList;
|
||||||
import com.amazonaws.services.s3.model.CanonicalGrantee;
|
import com.amazonaws.services.s3.model.CanonicalGrantee;
|
||||||
import com.amazonaws.services.s3.model.EmailAddressGrantee;
|
import com.amazonaws.services.s3.model.EmailAddressGrantee;
|
||||||
import com.amazonaws.services.s3.model.Grantee;
|
import com.amazonaws.services.s3.model.Grantee;
|
||||||
|
@ -82,6 +83,16 @@ public abstract class AbstractS3Processor extends AbstractAWSCredentialsProvider
|
||||||
.description("A comma-separated list of Amazon User ID's or E-mail addresses that specifies who should have permissions to change the Access Control List for an object")
|
.description("A comma-separated list of Amazon User ID's or E-mail addresses that specifies who should have permissions to change the Access Control List for an object")
|
||||||
.defaultValue("${s3.permissions.writeacl.users}")
|
.defaultValue("${s3.permissions.writeacl.users}")
|
||||||
.build();
|
.build();
|
||||||
|
public static final PropertyDescriptor CANNED_ACL = new PropertyDescriptor.Builder()
|
||||||
|
.name("canned-acl")
|
||||||
|
.displayName("Canned ACL")
|
||||||
|
.required(false)
|
||||||
|
.expressionLanguageSupported(true)
|
||||||
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
|
.description("Amazon Canned ACL for an object, one of: BucketOwnerFullControl, BucketOwnerRead, LogDeliveryWrite, AuthenticatedRead, PublicReadWrite, PublicRead, Private; " +
|
||||||
|
"will be ignored if any other ACL/permission/owner property is specified")
|
||||||
|
.defaultValue("${s3.permissions.cannedacl}")
|
||||||
|
.build();
|
||||||
public static final PropertyDescriptor OWNER = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor OWNER = new PropertyDescriptor.Builder()
|
||||||
.name("Owner")
|
.name("Owner")
|
||||||
.required(false)
|
.required(false)
|
||||||
|
@ -183,36 +194,80 @@ public abstract class AbstractS3Processor extends AbstractAWSCredentialsProvider
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create AccessControlList if appropriate properties are configured.
|
||||||
|
*
|
||||||
|
* @param context ProcessContext
|
||||||
|
* @param flowFile FlowFile
|
||||||
|
* @return AccessControlList or null if no ACL properties were specified
|
||||||
|
*/
|
||||||
protected final AccessControlList createACL(final ProcessContext context, final FlowFile flowFile) {
|
protected final AccessControlList createACL(final ProcessContext context, final FlowFile flowFile) {
|
||||||
final AccessControlList acl = new AccessControlList();
|
// lazy-initialize ACL, as it should not be used if no properties were specified
|
||||||
|
AccessControlList acl = null;
|
||||||
|
|
||||||
final String ownerId = context.getProperty(OWNER).evaluateAttributeExpressions(flowFile).getValue();
|
final String ownerId = context.getProperty(OWNER).evaluateAttributeExpressions(flowFile).getValue();
|
||||||
if (!StringUtils.isEmpty(ownerId)) {
|
if (!StringUtils.isEmpty(ownerId)) {
|
||||||
final Owner owner = new Owner();
|
final Owner owner = new Owner();
|
||||||
owner.setId(ownerId);
|
owner.setId(ownerId);
|
||||||
|
if (acl == null) {
|
||||||
|
acl = new AccessControlList();
|
||||||
|
}
|
||||||
acl.setOwner(owner);
|
acl.setOwner(owner);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (final Grantee grantee : createGrantees(context.getProperty(FULL_CONTROL_USER_LIST).evaluateAttributeExpressions(flowFile).getValue())) {
|
for (final Grantee grantee : createGrantees(context.getProperty(FULL_CONTROL_USER_LIST).evaluateAttributeExpressions(flowFile).getValue())) {
|
||||||
|
if (acl == null) {
|
||||||
|
acl = new AccessControlList();
|
||||||
|
}
|
||||||
acl.grantPermission(grantee, Permission.FullControl);
|
acl.grantPermission(grantee, Permission.FullControl);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (final Grantee grantee : createGrantees(context.getProperty(READ_USER_LIST).evaluateAttributeExpressions(flowFile).getValue())) {
|
for (final Grantee grantee : createGrantees(context.getProperty(READ_USER_LIST).evaluateAttributeExpressions(flowFile).getValue())) {
|
||||||
|
if (acl == null) {
|
||||||
|
acl = new AccessControlList();
|
||||||
|
}
|
||||||
acl.grantPermission(grantee, Permission.Read);
|
acl.grantPermission(grantee, Permission.Read);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (final Grantee grantee : createGrantees(context.getProperty(WRITE_USER_LIST).evaluateAttributeExpressions(flowFile).getValue())) {
|
for (final Grantee grantee : createGrantees(context.getProperty(WRITE_USER_LIST).evaluateAttributeExpressions(flowFile).getValue())) {
|
||||||
|
if (acl == null) {
|
||||||
|
acl = new AccessControlList();
|
||||||
|
}
|
||||||
acl.grantPermission(grantee, Permission.Write);
|
acl.grantPermission(grantee, Permission.Write);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (final Grantee grantee : createGrantees(context.getProperty(READ_ACL_LIST).evaluateAttributeExpressions(flowFile).getValue())) {
|
for (final Grantee grantee : createGrantees(context.getProperty(READ_ACL_LIST).evaluateAttributeExpressions(flowFile).getValue())) {
|
||||||
|
if (acl == null) {
|
||||||
|
acl = new AccessControlList();
|
||||||
|
}
|
||||||
acl.grantPermission(grantee, Permission.ReadAcp);
|
acl.grantPermission(grantee, Permission.ReadAcp);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (final Grantee grantee : createGrantees(context.getProperty(WRITE_ACL_LIST).evaluateAttributeExpressions(flowFile).getValue())) {
|
for (final Grantee grantee : createGrantees(context.getProperty(WRITE_ACL_LIST).evaluateAttributeExpressions(flowFile).getValue())) {
|
||||||
|
if (acl == null) {
|
||||||
|
acl = new AccessControlList();
|
||||||
|
}
|
||||||
acl.grantPermission(grantee, Permission.WriteAcp);
|
acl.grantPermission(grantee, Permission.WriteAcp);
|
||||||
}
|
}
|
||||||
|
|
||||||
return acl;
|
return acl;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create CannedAccessControlList if {@link #CANNED_ACL} property specified.
|
||||||
|
*
|
||||||
|
* @param context ProcessContext
|
||||||
|
* @param flowFile FlowFile
|
||||||
|
* @return CannedAccessControlList or null if not specified
|
||||||
|
*/
|
||||||
|
protected final CannedAccessControlList createCannedACL(final ProcessContext context, final FlowFile flowFile) {
|
||||||
|
CannedAccessControlList cannedAcl = null;
|
||||||
|
|
||||||
|
final String cannedAclString = context.getProperty(CANNED_ACL).evaluateAttributeExpressions(flowFile).getValue();
|
||||||
|
if (!StringUtils.isEmpty(cannedAclString)) {
|
||||||
|
cannedAcl = CannedAccessControlList.valueOf(cannedAclString);
|
||||||
|
}
|
||||||
|
|
||||||
|
return cannedAcl;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -64,6 +64,7 @@ import com.amazonaws.AmazonClientException;
|
||||||
import com.amazonaws.services.s3.AmazonS3Client;
|
import com.amazonaws.services.s3.AmazonS3Client;
|
||||||
import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
|
import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
|
||||||
import com.amazonaws.services.s3.model.AccessControlList;
|
import com.amazonaws.services.s3.model.AccessControlList;
|
||||||
|
import com.amazonaws.services.s3.model.CannedAccessControlList;
|
||||||
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
|
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
|
||||||
import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
|
import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
|
||||||
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
|
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
|
||||||
|
@ -190,7 +191,7 @@ public class PutS3Object extends AbstractS3Processor {
|
||||||
|
|
||||||
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
|
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
|
||||||
Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID,
|
Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID,
|
||||||
FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER, SSL_CONTEXT_SERVICE,
|
FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER, CANNED_ACL, SSL_CONTEXT_SERVICE,
|
||||||
ENDPOINT_OVERRIDE, MULTIPART_THRESHOLD, MULTIPART_PART_SIZE, MULTIPART_S3_AGEOFF_INTERVAL, MULTIPART_S3_MAX_AGE, SERVER_SIDE_ENCRYPTION,
|
ENDPOINT_OVERRIDE, MULTIPART_THRESHOLD, MULTIPART_PART_SIZE, MULTIPART_S3_AGEOFF_INTERVAL, MULTIPART_S3_MAX_AGE, SERVER_SIDE_ENCRYPTION,
|
||||||
PROXY_HOST, PROXY_HOST_PORT));
|
PROXY_HOST, PROXY_HOST_PORT));
|
||||||
|
|
||||||
|
@ -441,6 +442,10 @@ public class PutS3Object extends AbstractS3Processor {
|
||||||
if (acl != null) {
|
if (acl != null) {
|
||||||
request.setAccessControlList(acl);
|
request.setAccessControlList(acl);
|
||||||
}
|
}
|
||||||
|
final CannedAccessControlList cannedAcl = createCannedACL(context, ff);
|
||||||
|
if (cannedAcl != null) {
|
||||||
|
request.withCannedAcl(cannedAcl);
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
final PutObjectResult result = s3.putObject(request);
|
final PutObjectResult result = s3.putObject(request);
|
||||||
|
@ -529,6 +534,10 @@ public class PutS3Object extends AbstractS3Processor {
|
||||||
if (acl != null) {
|
if (acl != null) {
|
||||||
initiateRequest.setAccessControlList(acl);
|
initiateRequest.setAccessControlList(acl);
|
||||||
}
|
}
|
||||||
|
final CannedAccessControlList cannedAcl = createCannedACL(context, ff);
|
||||||
|
if (cannedAcl != null) {
|
||||||
|
initiateRequest.withCannedACL(cannedAcl);
|
||||||
|
}
|
||||||
try {
|
try {
|
||||||
final InitiateMultipartUploadResult initiateResult =
|
final InitiateMultipartUploadResult initiateResult =
|
||||||
s3.initiateMultipartUpload(initiateRequest);
|
s3.initiateMultipartUpload(initiateRequest);
|
||||||
|
|
|
@ -314,10 +314,11 @@ public class ITPutS3Object extends AbstractS3IT {
|
||||||
public void testGetPropertyDescriptors() throws Exception {
|
public void testGetPropertyDescriptors() throws Exception {
|
||||||
PutS3Object processor = new PutS3Object();
|
PutS3Object processor = new PutS3Object();
|
||||||
List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors();
|
List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors();
|
||||||
assertEquals("size should be eq", 25, pd.size());
|
assertEquals("size should be eq", 26, pd.size());
|
||||||
assertTrue(pd.contains(PutS3Object.ACCESS_KEY));
|
assertTrue(pd.contains(PutS3Object.ACCESS_KEY));
|
||||||
assertTrue(pd.contains(PutS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE));
|
assertTrue(pd.contains(PutS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE));
|
||||||
assertTrue(pd.contains(PutS3Object.BUCKET));
|
assertTrue(pd.contains(PutS3Object.BUCKET));
|
||||||
|
assertTrue(pd.contains(PutS3Object.CANNED_ACL));
|
||||||
assertTrue(pd.contains(PutS3Object.CREDENTIALS_FILE));
|
assertTrue(pd.contains(PutS3Object.CREDENTIALS_FILE));
|
||||||
assertTrue(pd.contains(PutS3Object.ENDPOINT_OVERRIDE));
|
assertTrue(pd.contains(PutS3Object.ENDPOINT_OVERRIDE));
|
||||||
assertTrue(pd.contains(PutS3Object.FULL_CONTROL_USER_LIST));
|
assertTrue(pd.contains(PutS3Object.FULL_CONTROL_USER_LIST));
|
||||||
|
|
Loading…
Reference in New Issue