diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java index 122231ae5e..b66468ad73 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java @@ -32,6 +32,7 @@ import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; @@ -73,10 +74,23 @@ public class FetchS3Object extends AbstractS3Processor { .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .required(false) .build(); + public static final PropertyDescriptor REQUESTER_PAYS = new PropertyDescriptor.Builder() + .name("requester-pays") + .displayName("Requester Pays") + .required(true) + .description("If true, indicates that the requester consents to pay any charges associated with retrieving objects from " + + "the S3 bucket. This sets the 'x-amz-request-payer' header to 'requester'.") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .allowableValues(new AllowableValue("true", "True", "Indicates that the requester consents to pay any charges associated " + + "with retrieving objects from the S3 bucket."), new AllowableValue("false", "False", "Does not consent to pay " + + "requester charges for retrieving objects from the S3 bucket.")) + .defaultValue("false") + .build(); public static final List properties = Collections.unmodifiableList( Arrays.asList(BUCKET, KEY, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, VERSION_ID, - SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, SIGNER_OVERRIDE, PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD)); + SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, SIGNER_OVERRIDE, PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD, + REQUESTER_PAYS)); @Override protected List getSupportedPropertyDescriptors() { @@ -94,6 +108,7 @@ public class FetchS3Object extends AbstractS3Processor { final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions(flowFile).getValue(); final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue(); final String versionId = context.getProperty(VERSION_ID).evaluateAttributeExpressions(flowFile).getValue(); + final boolean requesterPays = context.getProperty(REQUESTER_PAYS).asBoolean(); final AmazonS3 client = getClient(); final GetObjectRequest request; @@ -102,6 +117,7 @@ public class FetchS3Object extends AbstractS3Processor { } else { request = new GetObjectRequest(bucket, key, versionId); } + request.setRequesterPays(requesterPays); final Map attributes = new HashMap<>(); try (final S3Object s3Object = client.getObject(request)) { diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java index fb4e49fbbe..34c1dece0a 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java @@ -52,6 +52,9 @@ import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; import org.apache.nifi.components.state.Scope; import org.apache.nifi.components.state.StateMap; import org.apache.nifi.expression.ExpressionLanguageScope; @@ -153,6 +156,19 @@ public class ListS3 extends AbstractS3Processor { .allowableValues(new AllowableValue("true", "True"), new AllowableValue("false", "False")) .defaultValue("false") .build(); + public static final PropertyDescriptor REQUESTER_PAYS = new PropertyDescriptor.Builder() + .name("requester-pays") + .displayName("Requester Pays") + .required(true) + .description("If true, indicates that the requester consents to pay any charges associated with listing " + + "the S3 bucket. This sets the 'x-amz-request-payer' header to 'requester'. Note that this " + + "setting is not applicable when 'Use Versions' is 'true'.") + .addValidator(createRequesterPaysValidator()) + .allowableValues(new AllowableValue("true", "True", "Indicates that the requester consents to pay any charges associated " + + "with listing the S3 bucket."), new AllowableValue("false", "False", "Does not consent to pay " + + "requester charges for listing the S3 bucket.")) + .defaultValue("false") + .build(); public static final PropertyDescriptor WRITE_USER_METADATA = new PropertyDescriptor.Builder() .name("write-s3-user-metadata") @@ -168,7 +184,7 @@ public class ListS3 extends AbstractS3Processor { Arrays.asList(BUCKET, REGION, ACCESS_KEY, SECRET_KEY, WRITE_OBJECT_TAGS, WRITE_USER_METADATA, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, SIGNER_OVERRIDE, PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, - PROXY_PASSWORD, DELIMITER, PREFIX, USE_VERSIONS, LIST_TYPE, MIN_AGE)); + PROXY_PASSWORD, DELIMITER, PREFIX, USE_VERSIONS, LIST_TYPE, MIN_AGE, REQUESTER_PAYS)); public static final Set relationships = Collections.unmodifiableSet( new HashSet<>(Collections.singletonList(REL_SUCCESS))); @@ -180,6 +196,23 @@ public class ListS3 extends AbstractS3Processor { private long currentTimestamp = 0L; private Set currentKeys; + private static Validator createRequesterPaysValidator() { + return new Validator() { + @Override + public ValidationResult validate(final String subject, final String input, final ValidationContext context) { + boolean requesterPays = Boolean.valueOf(input); + boolean useVersions = context.getProperty(USE_VERSIONS).asBoolean(); + boolean valid = !requesterPays || !useVersions; + return new ValidationResult.Builder() + .input(input) + .subject(subject) + .valid(valid) + .explanation(valid ? null : "'Requester Pays' cannot be used when listing object versions.") + .build(); + } + }; + } + @Override protected List getSupportedPropertyDescriptors() { return properties; @@ -240,6 +273,7 @@ public class ListS3 extends AbstractS3Processor { final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions().getValue(); final long minAgeMilliseconds = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS); final long listingTimestamp = System.currentTimeMillis(); + final boolean requesterPays = context.getProperty(REQUESTER_PAYS).asBoolean(); final AmazonS3 client = getClient(); int listCount = 0; @@ -257,6 +291,7 @@ public class ListS3 extends AbstractS3Processor { : new S3ObjectBucketLister(client); bucketLister.setBucketName(bucket); + bucketLister.setRequesterPays(requesterPays); if (delimiter != null && !delimiter.isEmpty()) { bucketLister.setDelimiter(delimiter); @@ -386,6 +421,7 @@ public class ListS3 extends AbstractS3Processor { public void setBucketName(String bucketName); public void setPrefix(String prefix); public void setDelimiter(String delimiter); + public void setRequesterPays(boolean requesterPays); // Versions have a superset of the fields that Objects have, so we'll use // them as a common interface public VersionListing listVersions(); @@ -417,6 +453,11 @@ public class ListS3 extends AbstractS3Processor { listObjectsRequest.setDelimiter(delimiter); } + @Override + public void setRequesterPays(boolean requesterPays) { + listObjectsRequest.setRequesterPays(requesterPays); + } + @Override public VersionListing listVersions() { VersionListing versionListing = new VersionListing(); @@ -473,6 +514,11 @@ public class ListS3 extends AbstractS3Processor { listObjectsRequest.setDelimiter(delimiter); } + @Override + public void setRequesterPays(boolean requesterPays) { + listObjectsRequest.setRequesterPays(requesterPays); + } + @Override public VersionListing listVersions() { VersionListing versionListing = new VersionListing(); @@ -529,6 +575,11 @@ public class ListS3 extends AbstractS3Processor { listVersionsRequest.setDelimiter(delimiter); } + @Override + public void setRequesterPays(boolean requesterPays) { + // Not supported in versionListing, so this does nothing. + } + @Override public VersionListing listVersions() { versionListing = client.listVersions(listVersionsRequest); diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java index bcfff236c4..4326ed7e51 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java @@ -43,6 +43,7 @@ import org.mockito.Mockito; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -100,6 +101,65 @@ public class TestFetchS3Object { GetObjectRequest request = captureRequest.getValue(); assertEquals("request-bucket", request.getBucketName()); assertEquals("request-key", request.getKey()); + assertFalse(request.isRequesterPays()); + assertNull(request.getVersionId()); + + runner.assertAllFlowFilesTransferred(FetchS3Object.REL_SUCCESS, 1); + final List ffs = runner.getFlowFilesForRelationship(FetchS3Object.REL_SUCCESS); + MockFlowFile ff = ffs.get(0); + ff.assertAttributeEquals("s3.bucket", "response-bucket-name"); + ff.assertAttributeEquals(CoreAttributes.FILENAME.key(), "file.txt"); + ff.assertAttributeEquals(CoreAttributes.PATH.key(), "key/path/to"); + ff.assertAttributeEquals(CoreAttributes.ABSOLUTE_PATH.key(), "key/path/to/file.txt"); + ff.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "text/plain"); + ff.assertAttributeEquals("hash.value", "testMD5hash"); + ff.assertAttributeEquals("hash.algorithm", "MD5"); + ff.assertAttributeEquals("s3.etag", "test-etag"); + ff.assertAttributeEquals("s3.expirationTime", String.valueOf(expiration.getTime())); + ff.assertAttributeEquals("s3.expirationTimeRuleId", "testExpirationRuleId"); + ff.assertAttributeEquals("userKey1", "userValue1"); + ff.assertAttributeEquals("userKey2", "userValue2"); + ff.assertAttributeEquals("s3.sseAlgorithm", "testAlgorithm"); + ff.assertContentEquals("Some Content"); + } + + @Test + public void testGetObjectWithRequesterPays() throws IOException { + runner.setProperty(FetchS3Object.REGION, "us-east-1"); + runner.setProperty(FetchS3Object.BUCKET, "request-bucket"); + runner.setProperty(FetchS3Object.REQUESTER_PAYS, "true"); + final Map attrs = new HashMap<>(); + attrs.put("filename", "request-key"); + runner.enqueue(new byte[0], attrs); + + S3Object s3ObjectResponse = new S3Object(); + s3ObjectResponse.setBucketName("response-bucket-name"); + s3ObjectResponse.setKey("response-key"); + s3ObjectResponse.setObjectContent(new StringInputStream("Some Content")); + ObjectMetadata metadata = Mockito.spy(ObjectMetadata.class); + metadata.setContentDisposition("key/path/to/file.txt"); + metadata.setContentType("text/plain"); + metadata.setContentMD5("testMD5hash"); + Date expiration = new Date(); + metadata.setExpirationTime(expiration); + metadata.setExpirationTimeRuleId("testExpirationRuleId"); + Map userMetadata = new HashMap<>(); + userMetadata.put("userKey1", "userValue1"); + userMetadata.put("userKey2", "userValue2"); + metadata.setUserMetadata(userMetadata); + metadata.setSSEAlgorithm("testAlgorithm"); + Mockito.when(metadata.getETag()).thenReturn("test-etag"); + s3ObjectResponse.setObjectMetadata(metadata); + Mockito.when(mockS3Client.getObject(Mockito.any())).thenReturn(s3ObjectResponse); + + runner.run(1); + + ArgumentCaptor captureRequest = ArgumentCaptor.forClass(GetObjectRequest.class); + Mockito.verify(mockS3Client, Mockito.times(1)).getObject(captureRequest.capture()); + GetObjectRequest request = captureRequest.getValue(); + assertEquals("request-bucket", request.getBucketName()); + assertEquals("request-key", request.getKey()); + assertTrue(request.isRequesterPays()); assertNull(request.getVersionId()); runner.assertAllFlowFilesTransferred(FetchS3Object.REL_SUCCESS, 1); @@ -179,7 +239,7 @@ public class TestFetchS3Object { public void testGetPropertyDescriptors() throws Exception { FetchS3Object processor = new FetchS3Object(); List pd = processor.getSupportedPropertyDescriptors(); - assertEquals("size should be eq", 17, pd.size()); + assertEquals("size should be eq", 18, pd.size()); assertTrue(pd.contains(FetchS3Object.ACCESS_KEY)); assertTrue(pd.contains(FetchS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE)); assertTrue(pd.contains(FetchS3Object.BUCKET)); @@ -197,6 +257,7 @@ public class TestFetchS3Object { assertTrue(pd.contains(FetchS3Object.PROXY_HOST_PORT)); assertTrue(pd.contains(FetchS3Object.PROXY_USERNAME)); assertTrue(pd.contains(FetchS3Object.PROXY_PASSWORD)); + assertTrue(pd.contains(FetchS3Object.REQUESTER_PAYS)); } } diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java index 3798d84391..86af835f2c 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java @@ -50,6 +50,7 @@ import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -103,6 +104,7 @@ public class TestListS3 { Mockito.verify(mockS3Client, Mockito.times(1)).listObjects(captureRequest.capture()); ListObjectsRequest request = captureRequest.getValue(); assertEquals("test-bucket", request.getBucketName()); + assertFalse(request.isRequesterPays()); Mockito.verify(mockS3Client, Mockito.never()).listVersions(Mockito.any()); runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 3); @@ -117,6 +119,62 @@ public class TestListS3 { runner.getStateManager().assertStateEquals(ListS3.CURRENT_TIMESTAMP, lastModifiedTimestamp, Scope.CLUSTER); } + @Test + public void testListWithRequesterPays() { + runner.setProperty(ListS3.REGION, "eu-west-1"); + runner.setProperty(ListS3.BUCKET, "test-bucket"); + runner.setProperty(ListS3.REQUESTER_PAYS, "true"); + + Date lastModified = new Date(); + ObjectListing objectListing = new ObjectListing(); + S3ObjectSummary objectSummary1 = new S3ObjectSummary(); + objectSummary1.setBucketName("test-bucket"); + objectSummary1.setKey("a"); + objectSummary1.setLastModified(lastModified); + objectListing.getObjectSummaries().add(objectSummary1); + S3ObjectSummary objectSummary2 = new S3ObjectSummary(); + objectSummary2.setBucketName("test-bucket"); + objectSummary2.setKey("b/c"); + objectSummary2.setLastModified(lastModified); + objectListing.getObjectSummaries().add(objectSummary2); + S3ObjectSummary objectSummary3 = new S3ObjectSummary(); + objectSummary3.setBucketName("test-bucket"); + objectSummary3.setKey("d/e"); + objectSummary3.setLastModified(lastModified); + objectListing.getObjectSummaries().add(objectSummary3); + Mockito.when(mockS3Client.listObjects(Mockito.any(ListObjectsRequest.class))).thenReturn(objectListing); + + runner.run(); + + ArgumentCaptor captureRequest = ArgumentCaptor.forClass(ListObjectsRequest.class); + Mockito.verify(mockS3Client, Mockito.times(1)).listObjects(captureRequest.capture()); + ListObjectsRequest request = captureRequest.getValue(); + assertEquals("test-bucket", request.getBucketName()); + assertTrue(request.isRequesterPays()); + Mockito.verify(mockS3Client, Mockito.never()).listVersions(Mockito.any()); + + runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 3); + List flowFiles = runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS); + MockFlowFile ff0 = flowFiles.get(0); + ff0.assertAttributeEquals("filename", "a"); + ff0.assertAttributeEquals("s3.bucket", "test-bucket"); + String lastModifiedTimestamp = String.valueOf(lastModified.getTime()); + ff0.assertAttributeEquals("s3.lastModified", lastModifiedTimestamp); + flowFiles.get(1).assertAttributeEquals("filename", "b/c"); + flowFiles.get(2).assertAttributeEquals("filename", "d/e"); + runner.getStateManager().assertStateEquals(ListS3.CURRENT_TIMESTAMP, lastModifiedTimestamp, Scope.CLUSTER); + } + + @Test + public void testListWithRequesterPays_invalid() { + runner.setProperty(ListS3.REGION, "eu-west-1"); + runner.setProperty(ListS3.BUCKET, "test-bucket"); + runner.setProperty(ListS3.USE_VERSIONS, "true"); // requester pays cannot be used with versions + runner.setProperty(ListS3.REQUESTER_PAYS, "true"); + + runner.assertNotValid(); + } + @Test public void testListVersion2() { runner.setProperty(ListS3.REGION, "eu-west-1"); @@ -148,6 +206,54 @@ public class TestListS3 { Mockito.verify(mockS3Client, Mockito.times(1)).listObjectsV2(captureRequest.capture()); ListObjectsV2Request request = captureRequest.getValue(); assertEquals("test-bucket", request.getBucketName()); + assertFalse(request.isRequesterPays()); + Mockito.verify(mockS3Client, Mockito.never()).listVersions(Mockito.any()); + + runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 3); + List flowFiles = runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS); + MockFlowFile ff0 = flowFiles.get(0); + ff0.assertAttributeEquals("filename", "a"); + ff0.assertAttributeEquals("s3.bucket", "test-bucket"); + String lastModifiedTimestamp = String.valueOf(lastModified.getTime()); + ff0.assertAttributeEquals("s3.lastModified", lastModifiedTimestamp); + flowFiles.get(1).assertAttributeEquals("filename", "b/c"); + flowFiles.get(2).assertAttributeEquals("filename", "d/e"); + runner.getStateManager().assertStateEquals(ListS3.CURRENT_TIMESTAMP, lastModifiedTimestamp, Scope.CLUSTER); + } + + @Test + public void testListVersion2WithRequesterPays() { + runner.setProperty(ListS3.REGION, "eu-west-1"); + runner.setProperty(ListS3.BUCKET, "test-bucket"); + runner.setProperty(ListS3.REQUESTER_PAYS, "true"); + runner.setProperty(ListS3.LIST_TYPE, "2"); + + Date lastModified = new Date(); + ListObjectsV2Result objectListing = new ListObjectsV2Result(); + S3ObjectSummary objectSummary1 = new S3ObjectSummary(); + objectSummary1.setBucketName("test-bucket"); + objectSummary1.setKey("a"); + objectSummary1.setLastModified(lastModified); + objectListing.getObjectSummaries().add(objectSummary1); + S3ObjectSummary objectSummary2 = new S3ObjectSummary(); + objectSummary2.setBucketName("test-bucket"); + objectSummary2.setKey("b/c"); + objectSummary2.setLastModified(lastModified); + objectListing.getObjectSummaries().add(objectSummary2); + S3ObjectSummary objectSummary3 = new S3ObjectSummary(); + objectSummary3.setBucketName("test-bucket"); + objectSummary3.setKey("d/e"); + objectSummary3.setLastModified(lastModified); + objectListing.getObjectSummaries().add(objectSummary3); + Mockito.when(mockS3Client.listObjectsV2(Mockito.any(ListObjectsV2Request.class))).thenReturn(objectListing); + + runner.run(); + + ArgumentCaptor captureRequest = ArgumentCaptor.forClass(ListObjectsV2Request.class); + Mockito.verify(mockS3Client, Mockito.times(1)).listObjectsV2(captureRequest.capture()); + ListObjectsV2Request request = captureRequest.getValue(); + assertEquals("test-bucket", request.getBucketName()); + assertTrue(request.isRequesterPays()); Mockito.verify(mockS3Client, Mockito.never()).listVersions(Mockito.any()); runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 3); @@ -375,5 +481,6 @@ public class TestListS3 { assertTrue(pd.contains(ListS3.PROXY_HOST_PORT)); assertTrue(pd.contains(ListS3.PROXY_USERNAME)); assertTrue(pd.contains(ListS3.PROXY_PASSWORD)); + assertTrue(pd.contains(ListS3.REQUESTER_PAYS)); } }