diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml index a1ba7757c4..190184612e 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml @@ -78,13 +78,6 @@ com.amazonaws aws-java-sdk-sts - - - com.fasterxml.jackson.core - jackson-databind - 2.6.6 - test - diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java index 3e037f8bb1..f5a69acb59 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java @@ -26,6 +26,9 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; +import com.amazonaws.services.s3.model.GetObjectTaggingRequest; +import com.amazonaws.services.s3.model.GetObjectTaggingResult; +import com.amazonaws.services.s3.model.Tag; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.PrimaryNodeOnly; @@ -80,7 +83,9 @@ import com.amazonaws.services.s3.model.ListObjectsV2Result; @WritesAttribute(attribute = "s3.lastModified", description = "The last modified time in milliseconds since epoch in UTC time"), @WritesAttribute(attribute = "s3.length", description = "The size of the object in bytes"), @WritesAttribute(attribute = "s3.storeClass", description = "The storage class of the object"), - @WritesAttribute(attribute = "s3.version", description = "The version of the object, if applicable")}) + @WritesAttribute(attribute = "s3.version", description = "The version of the object, if applicable"), + @WritesAttribute(attribute = "s3.tag.___", description = "If 'Write Object Tags' is set to 'True', the tags associated to the S3 object that is being listed " + + "will be written as part of the flowfile attributes")}) @SeeAlso({FetchS3Object.class, PutS3Object.class, DeleteS3Object.class}) public class ListS3 extends AbstractS3Processor { @@ -136,11 +141,20 @@ public class ListS3 extends AbstractS3Processor { .defaultValue("0 sec") .build(); + public static final PropertyDescriptor WRITE_OBJECT_TAGS = new PropertyDescriptor.Builder() + .name("write-s3-object-tags") + .displayName("Write Object Tags") + .description("If set to 'True', the tags associated with the S3 object will be written as FlowFile attributes") + .required(true) + .allowableValues(new AllowableValue("true", "True"), new AllowableValue("false", "False")) + .defaultValue("false") + .build(); + public static final List properties = Collections.unmodifiableList( - Arrays.asList(BUCKET, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, + Arrays.asList(BUCKET, REGION, ACCESS_KEY, SECRET_KEY, WRITE_OBJECT_TAGS, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, - SIGNER_OVERRIDE, PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD, - DELIMITER, PREFIX, USE_VERSIONS, LIST_TYPE, MIN_AGE)); + SIGNER_OVERRIDE, PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, + PROXY_PASSWORD, DELIMITER, PREFIX, USE_VERSIONS, LIST_TYPE, MIN_AGE)); public static final Set relationships = Collections.unmodifiableSet( new HashSet<>(Collections.singletonList(REL_SUCCESS))); @@ -263,6 +277,10 @@ public class ListS3 extends AbstractS3Processor { attributes.put("s3.version", versionSummary.getVersionId()); } + if (context.getProperty(WRITE_OBJECT_TAGS).asBoolean()) { + attributes.putAll(writeObjectTags(client, versionSummary)); + } + // Create the flowfile FlowFile flowFile = session.create(); flowFile = session.putAllAttributes(flowFile, attributes); @@ -307,6 +325,20 @@ public class ListS3 extends AbstractS3Processor { return willCommit; } + private Map writeObjectTags(AmazonS3 client, S3VersionSummary versionSummary) { + final GetObjectTaggingResult taggingResult = client.getObjectTagging(new GetObjectTaggingRequest(versionSummary.getBucketName(), versionSummary.getKey())); + final Map tagMap = new HashMap<>(); + + if (taggingResult != null) { + final List tags = taggingResult.getTagSet(); + + for (final Tag tag : tags) { + tagMap.put("s3.tag." + tag.getKey(), tag.getValue()); + } + } + return tagMap; + } + private interface S3BucketLister { public void setBucketName(String bucketName); public void setPrefix(String prefix); diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java index f856b85109..49a649e8c4 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java @@ -39,6 +39,8 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import com.amazonaws.services.s3.model.ObjectTagging; +import com.amazonaws.services.s3.model.Tag; import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; @@ -49,6 +51,7 @@ import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; @@ -205,11 +208,32 @@ public class PutS3Object extends AbstractS3Processor { .defaultValue(NO_SERVER_SIDE_ENCRYPTION) .build(); + public static final PropertyDescriptor OBJECT_TAGS_PREFIX = new PropertyDescriptor.Builder() + .name("s3-object-tags-prefix") + .displayName("Object Tags Prefix") + .description("Specifies the prefix which would be scanned against the incoming FlowFile's attributes and the matching attribute's " + + "name and value would be considered as the outgoing S3 object's Tag name and Tag value respectively. For Ex: If the " + + "incoming FlowFile carries the attributes tagS3country, tagS3PII, the tag prefix to be specified would be 'tagS3'") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + public static final PropertyDescriptor REMOVE_TAG_PREFIX = new PropertyDescriptor.Builder() + .name("s3-object-remove-tags-prefix") + .displayName("Remove Tag Prefix") + .description("If set to 'True', the value provided for '" + OBJECT_TAGS_PREFIX.getDisplayName() + "' will be removed from " + + "the attribute(s) and then considered as the Tag name. For ex: If the incoming FlowFile carries the attributes tagS3country, " + + "tagS3PII and the prefix is set to 'tagS3' then the corresponding tag values would be 'country' and 'PII'") + .allowableValues(new AllowableValue("true", "True"), new AllowableValue("false", "False")) + .defaultValue("false") + .build(); + public static final List properties = Collections.unmodifiableList( - Arrays.asList(KEY, BUCKET, CONTENT_TYPE, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID, - FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER, CANNED_ACL, SSL_CONTEXT_SERVICE, - ENDPOINT_OVERRIDE, SIGNER_OVERRIDE, MULTIPART_THRESHOLD, MULTIPART_PART_SIZE, MULTIPART_S3_AGEOFF_INTERVAL, MULTIPART_S3_MAX_AGE, - SERVER_SIDE_ENCRYPTION, PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD)); + Arrays.asList(KEY, BUCKET, CONTENT_TYPE, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, OBJECT_TAGS_PREFIX, REMOVE_TAG_PREFIX, + STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID, FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER, + CANNED_ACL, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, SIGNER_OVERRIDE, MULTIPART_THRESHOLD, MULTIPART_PART_SIZE, MULTIPART_S3_AGEOFF_INTERVAL, + MULTIPART_S3_MAX_AGE, SERVER_SIDE_ENCRYPTION, PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD)); final static String S3_BUCKET_KEY = "s3.bucket"; final static String S3_OBJECT_KEY = "s3.key"; @@ -415,6 +439,7 @@ public class PutS3Object extends AbstractS3Processor { * Then */ try { + final FlowFile flowFileCopy = flowFile; session.read(flowFile, new InputStreamCallback() { @Override public void process(final InputStream rawIn) throws IOException { @@ -471,6 +496,10 @@ public class PutS3Object extends AbstractS3Processor { request.withCannedAcl(cannedAcl); } + if (context.getProperty(OBJECT_TAGS_PREFIX).isSet()) { + request.setTagging(new ObjectTagging(getObjectTags(context, flowFileCopy))); + } + try { final PutObjectResult result = s3.putObject(request); if (result.getVersionId() != null) { @@ -562,6 +591,11 @@ public class PutS3Object extends AbstractS3Processor { if (cannedAcl != null) { initiateRequest.withCannedACL(cannedAcl); } + + if (context.getProperty(OBJECT_TAGS_PREFIX).isSet()) { + initiateRequest.setTagging(new ObjectTagging(getObjectTags(context, flowFileCopy))); + } + try { final InitiateMultipartUploadResult initiateResult = s3.initiateMultipartUpload(initiateRequest); @@ -785,6 +819,26 @@ public class PutS3Object extends AbstractS3Processor { } } + private List getObjectTags(ProcessContext context, FlowFile flowFile) { + final String prefix = context.getProperty(OBJECT_TAGS_PREFIX).evaluateAttributeExpressions(flowFile).getValue(); + final List objectTags = new ArrayList<>(); + final Map attributesMap = flowFile.getAttributes(); + + attributesMap.entrySet().stream().sequential() + .filter(attribute -> attribute.getKey().startsWith(prefix)) + .forEach(attribute -> { + String tagKey = attribute.getKey(); + String tagValue = attribute.getValue(); + + if (context.getProperty(REMOVE_TAG_PREFIX).asBoolean()) { + tagKey = tagKey.replace(prefix, ""); + } + objectTags.add(new Tag(tagKey, tagValue)); + }); + + return objectTags; + } + protected static class MultipartState implements Serializable { private static final long serialVersionUID = 9006072180563519740L; diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java index eaaaa8aa0d..d59093ba1d 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java @@ -23,8 +23,11 @@ import com.amazonaws.services.s3.model.CreateBucketRequest; import com.amazonaws.services.s3.model.DeleteBucketRequest; import com.amazonaws.services.s3.model.ObjectListing; import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.ObjectTagging; import com.amazonaws.services.s3.model.PutObjectRequest; +import com.amazonaws.services.s3.model.PutObjectResult; import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.amazonaws.services.s3.model.Tag; import org.apache.nifi.util.file.FileUtils; import org.junit.AfterClass; import org.junit.Assert; @@ -38,6 +41,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.List; import static org.junit.Assert.fail; @@ -58,7 +62,7 @@ public abstract class AbstractS3IT { // when bucket is rapidly added/deleted and consistency propagation causes this error. // (Should not be necessary if REGION remains static, but added to prevent future frustration.) // [see http://stackoverflow.com/questions/13898057/aws-error-message-a-conflicting-conditional-operation-is-currently-in-progress] - protected final static String BUCKET_NAME = "test-bucket-00000000-0000-0000-0000-123456789021-" + REGION; + protected final static String BUCKET_NAME = "test-bucket-" + System.currentTimeMillis() + "-" + REGION; // Static so multiple Tests can use same client protected static AmazonS3Client client; @@ -144,6 +148,12 @@ public abstract class AbstractS3IT { client.putObject(putRequest); } + protected void putFileWithObjectTag(String key, File file, List objectTags) { + PutObjectRequest putRequest = new PutObjectRequest(BUCKET_NAME, key, file); + putRequest.setTagging(new ObjectTagging(objectTags)); + PutObjectResult result = client.putObject(putRequest); + } + protected Path getResourcePath(String resourceName) { Path path = null; diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITListS3.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITListS3.java index e7a4482d3d..fbe17edc1e 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITListS3.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITListS3.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.processors.aws.s3; +import com.amazonaws.services.s3.model.Tag; import org.apache.nifi.processors.aws.AbstractAWSProcessor; import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService; import org.apache.nifi.util.MockFlowFile; @@ -24,6 +25,7 @@ import org.apache.nifi.util.TestRunners; import org.junit.Test; import java.io.IOException; +import java.util.ArrayList; import java.util.List; @@ -64,7 +66,7 @@ public class ITListS3 extends AbstractS3IT { runner.addControllerService("awsCredentialsProvider", serviceImpl); - runner.setProperty(serviceImpl, AbstractAWSProcessor.CREDENTIALS_FILE, System.getProperty("user.home") + "/aws-credentials.properties"); + runner.setProperty(serviceImpl, AbstractAWSProcessor.CREDENTIALS_FILE, CREDENTIALS_FILE); runner.enableControllerService(serviceImpl); runner.assertValid(serviceImpl); @@ -142,4 +144,33 @@ public class ITListS3 extends AbstractS3IT { flowFiles.get(0).assertAttributeEquals("filename", "b/c"); } + @Test + public void testObjectTagsWritten() { + List objectTags = new ArrayList<>(); + objectTags.add(new Tag("dummytag1", "dummyvalue1")); + objectTags.add(new Tag("dummytag2", "dummyvalue2")); + + putFileWithObjectTag("b/fileWithTag", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME), objectTags); + + final TestRunner runner = TestRunners.newTestRunner(new ListS3()); + + runner.setProperty(ListS3.CREDENTIALS_FILE, CREDENTIALS_FILE); + runner.setProperty(ListS3.PREFIX, "b/"); + runner.setProperty(ListS3.REGION, REGION); + runner.setProperty(ListS3.BUCKET, BUCKET_NAME); + runner.setProperty(ListS3.WRITE_OBJECT_TAGS, "true"); + + runner.run(); + + runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 1); + + MockFlowFile flowFiles = runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS).get(0); + + flowFiles.assertAttributeEquals("filename", "b/fileWithTag"); + flowFiles.assertAttributeExists("s3.tag.dummytag1"); + flowFiles.assertAttributeExists("s3.tag.dummytag2"); + flowFiles.assertAttributeEquals("s3.tag.dummytag1", "dummyvalue1"); + flowFiles.assertAttributeEquals("s3.tag.dummytag2", "dummyvalue2"); + } + } diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java index 2832e29139..2db139a0dd 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java @@ -28,8 +28,11 @@ import java.util.Map; import java.util.regex.Pattern; import com.amazonaws.AmazonServiceException; +import com.amazonaws.services.s3.model.GetObjectTaggingRequest; +import com.amazonaws.services.s3.model.GetObjectTaggingResult; import com.amazonaws.services.s3.model.ListMultipartUploadsRequest; import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.Tag; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; @@ -185,7 +188,7 @@ public class ITPutS3Object extends AbstractS3IT { runner.addControllerService("awsCredentialsProvider", serviceImpl); - runner.setProperty(serviceImpl, AbstractAWSCredentialsProviderProcessor.CREDENTIALS_FILE, System.getProperty("user.home") + "/aws-credentials.properties"); + runner.setProperty(serviceImpl, AbstractAWSCredentialsProviderProcessor.CREDENTIALS_FILE, CREDENTIALS_FILE); runner.enableControllerService(serviceImpl); runner.assertValid(serviceImpl); @@ -840,6 +843,35 @@ public class ITPutS3Object extends AbstractS3IT { Assert.assertEquals(0, uploadList.getMultipartUploads().size()); } + @Test + public void testObjectTags() throws IOException, InterruptedException { + final TestRunner runner = TestRunners.newTestRunner(new PutS3Object()); + runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE); + runner.setProperty(PutS3Object.REGION, REGION); + runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME); + runner.setProperty(PutS3Object.OBJECT_TAGS_PREFIX, "tagS3"); + runner.setProperty(PutS3Object.REMOVE_TAG_PREFIX, "true"); + + final Map attrs = new HashMap<>(); + attrs.put("filename", "tag-test.txt"); + attrs.put("tagS3PII", "true"); + runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME), attrs); + + runner.run(); + runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 1); + + GetObjectTaggingResult result = client.getObjectTagging(new GetObjectTaggingRequest(BUCKET_NAME, "tag-test.txt")); + List objectTags = result.getTagSet(); + + for (Tag tag : objectTags) { + System.out.println("Tag Key : " + tag.getKey() + ", Tag Value : " + tag.getValue()); + } + + Assert.assertTrue(objectTags.size() == 1); + Assert.assertEquals("PII", objectTags.get(0).getKey()); + Assert.assertEquals("true", objectTags.get(0).getValue()); + } + private class MockAmazonS3Client extends AmazonS3Client { MultipartUploadListing listing; public void setListing(MultipartUploadListing newlisting) { diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java index 48210b94d3..be12d6db80 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java @@ -23,6 +23,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import com.amazonaws.services.s3.model.GetObjectTaggingRequest; import org.apache.commons.lang3.time.DateUtils; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.state.Scope; @@ -291,17 +292,45 @@ public class TestListS3 { runner.getStateManager().assertStateEquals(ListS3.CURRENT_TIMESTAMP, lastModifiedTimestamp, Scope.CLUSTER); } + @Test + public void testWriteObjectTags() { + runner.setProperty(ListS3.REGION, "eu-west-1"); + runner.setProperty(ListS3.BUCKET, "test-bucket"); + runner.setProperty(ListS3.WRITE_OBJECT_TAGS, "true"); + + Date lastModified = new Date(); + ObjectListing objectListing = new ObjectListing(); + S3ObjectSummary objectSummary1 = new S3ObjectSummary(); + objectSummary1.setBucketName("test-bucket"); + objectSummary1.setKey("a"); + objectSummary1.setLastModified(lastModified); + objectListing.getObjectSummaries().add(objectSummary1); + + Mockito.when(mockS3Client.listObjects(Mockito.any(ListObjectsRequest.class))).thenReturn(objectListing); + + runner.run(); + + ArgumentCaptor captureRequest = ArgumentCaptor.forClass(GetObjectTaggingRequest.class); + Mockito.verify(mockS3Client, Mockito.times(1)).getObjectTagging(captureRequest.capture()); + GetObjectTaggingRequest request = captureRequest.getValue(); + + assertEquals("test-bucket", request.getBucketName()); + assertEquals("a", request.getKey()); + Mockito.verify(mockS3Client, Mockito.never()).listVersions(Mockito.any()); + } + @Test public void testGetPropertyDescriptors() throws Exception { ListS3 processor = new ListS3(); List pd = processor.getSupportedPropertyDescriptors(); - assertEquals("size should be eq", 20, pd.size()); + assertEquals("size should be eq", 21, pd.size()); assertTrue(pd.contains(ListS3.ACCESS_KEY)); assertTrue(pd.contains(ListS3.AWS_CREDENTIALS_PROVIDER_SERVICE)); assertTrue(pd.contains(ListS3.BUCKET)); assertTrue(pd.contains(ListS3.CREDENTIALS_FILE)); assertTrue(pd.contains(ListS3.ENDPOINT_OVERRIDE)); assertTrue(pd.contains(ListS3.REGION)); + assertTrue(pd.contains(ListS3.WRITE_OBJECT_TAGS)); assertTrue(pd.contains(ListS3.SECRET_KEY)); assertTrue(pd.contains(ListS3.SIGNER_OVERRIDE)); assertTrue(pd.contains(ListS3.SSL_CONTEXT_SERVICE)); diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java index 747da00907..2c3db81140 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java @@ -21,6 +21,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import com.amazonaws.services.s3.model.Tag; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.attributes.CoreAttributes; @@ -71,26 +72,8 @@ public class TestPutS3Object { @Test public void testPutSinglePart() { - runner.setProperty(PutS3Object.REGION, "ap-northeast-1"); - runner.setProperty(PutS3Object.BUCKET, "test-bucket"); runner.setProperty("x-custom-prop", "hello"); - final Map ffAttributes = new HashMap<>(); - ffAttributes.put("filename", "testfile.txt"); - runner.enqueue("Test Content", ffAttributes); - - PutObjectResult putObjectResult = Mockito.spy(PutObjectResult.class); - Date expiration = new Date(); - putObjectResult.setExpirationTime(expiration); - putObjectResult.setMetadata(new ObjectMetadata()); - putObjectResult.setVersionId("test-version"); - Mockito.when(putObjectResult.getETag()).thenReturn("test-etag"); - Mockito.when(mockS3Client.putObject(Mockito.any(PutObjectRequest.class))).thenReturn(putObjectResult); - MultipartUploadListing uploadListing = new MultipartUploadListing(); - Mockito.when(mockS3Client.listMultipartUploads(Mockito.any(ListMultipartUploadsRequest.class))).thenReturn(uploadListing); - Mockito.when(mockS3Client.getResourceUrl(Mockito.anyString(), Mockito.anyString())).thenReturn("test-s3-url"); - - runner.assertValid(); - runner.run(1); + testBase(); ArgumentCaptor captureRequest = ArgumentCaptor.forClass(PutObjectRequest.class); Mockito.verify(mockS3Client, Mockito.times(1)).putObject(captureRequest.capture()); @@ -98,8 +81,10 @@ public class TestPutS3Object { assertEquals("test-bucket", request.getBucketName()); runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 1); + List flowFiles = runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS); MockFlowFile ff0 = flowFiles.get(0); + ff0.assertAttributeEquals(CoreAttributes.FILENAME.key(), "testfile.txt"); ff0.assertAttributeEquals(PutS3Object.S3_ETAG_ATTR_KEY, "test-etag"); ff0.assertAttributeEquals(PutS3Object.S3_VERSION_ATTR_KEY, "test-version"); @@ -147,11 +132,55 @@ public class TestPutS3Object { } } + @Test + public void testObjectTags() { + runner.setProperty(PutS3Object.OBJECT_TAGS_PREFIX, "tagS3"); + runner.setProperty(PutS3Object.REMOVE_TAG_PREFIX, "false"); + testBase(); + + ArgumentCaptor captureRequest = ArgumentCaptor.forClass(PutObjectRequest.class); + Mockito.verify(mockS3Client, Mockito.times(1)).putObject(captureRequest.capture()); + PutObjectRequest request = captureRequest.getValue(); + + List tagSet = request.getTagging().getTagSet(); + + assertEquals(1, tagSet.size()); + assertEquals("tagS3PII", tagSet.get(0).getKey()); + assertEquals("true", tagSet.get(0).getValue()); + + } + + private void testBase() { + runner.setProperty(PutS3Object.REGION, "ap-northeast-1"); + runner.setProperty(PutS3Object.BUCKET, "test-bucket"); + + final Map ffAttributes = new HashMap<>(); + ffAttributes.put("filename", "testfile.txt"); + ffAttributes.put("tagS3PII", "true"); + runner.enqueue("Test Content", ffAttributes); + + PutObjectResult putObjectResult = Mockito.spy(PutObjectResult.class); + Date expiration = new Date(); + + putObjectResult.setExpirationTime(expiration); + putObjectResult.setMetadata(new ObjectMetadata()); + putObjectResult.setVersionId("test-version"); + + Mockito.when(putObjectResult.getETag()).thenReturn("test-etag"); + Mockito.when(mockS3Client.putObject(Mockito.any(PutObjectRequest.class))).thenReturn(putObjectResult); + + MultipartUploadListing uploadListing = new MultipartUploadListing(); + Mockito.when(mockS3Client.listMultipartUploads(Mockito.any(ListMultipartUploadsRequest.class))).thenReturn(uploadListing); + + runner.assertValid(); + runner.run(1); + } + @Test public void testGetPropertyDescriptors() throws Exception { PutS3Object processor = new PutS3Object(); List pd = processor.getSupportedPropertyDescriptors(); - assertEquals("size should be eq", 31, pd.size()); + assertEquals("size should be eq", 33, pd.size()); assertTrue(pd.contains(PutS3Object.ACCESS_KEY)); assertTrue(pd.contains(PutS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE)); assertTrue(pd.contains(PutS3Object.BUCKET)); @@ -178,6 +207,8 @@ public class TestPutS3Object { assertTrue(pd.contains(PutS3Object.PROXY_HOST_PORT)); assertTrue(pd.contains(PutS3Object.PROXY_USERNAME)); assertTrue(pd.contains(PutS3Object.PROXY_PASSWORD)); + assertTrue(pd.contains(PutS3Object.OBJECT_TAGS_PREFIX)); + assertTrue(pd.contains(PutS3Object.REMOVE_TAG_PREFIX)); } }